mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12832: Implicit invalidate metadata on event failures
At present, failure in event processing needs manual invalidate metadata. This patch implicitly invalidates the table upon failures in processing of table events with new 'invalidate_metadata_on_event_processing_failure' flag. And a new 'invalidate_global_metadata_on_event_processing_failure' flag is added to global invalidate metadata automatically when event processor goes to non-active state. Note: Also introduced a config 'inject_process_event_failure_event_types' for automated tests to simulate event processor failures. This config is used to specify what event types can be intentionally failed. This config should only be used for testing purpose. Need IMPALA-12851 as a prerequisite Testing: - Added end-to-end tests to mimic failures in event processor and verified that event processor is active - Added unit test to verify the 'auto_global_invalidate_metadata' config - Passed FE tests Co-Authored-by: Sai Hemanth Gantasala <saihemanth@cloudera.com> Change-Id: Ia67fc04c995802d3b6b56f79564bf0954b012c6c Reviewed-on: http://gerrit.cloudera.org:8080/21065 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
9702724959
commit
b7ddbcad0d
@@ -163,6 +163,32 @@ DEFINE_int32(topic_update_log_gc_frequency, 1000, "Frequency at which the entrie
|
||||
"of the catalog topic update log are garbage collected. An entry may survive "
|
||||
"for (2 * TOPIC_UPDATE_LOG_GC_FREQUENCY) - 1 topic updates.");
|
||||
|
||||
DEFINE_bool(invalidate_metadata_on_event_processing_failure, true,
|
||||
"This configuration is used to invalidate metadata for table(s) upon event process "
|
||||
"failure other than HMS connection issues. The default value is true. When enabled, "
|
||||
"invalidate metadata is performed automatically upon event process failure. "
|
||||
"Otherwise, failure can put metastore event processor in non-active state.");
|
||||
|
||||
DEFINE_bool(invalidate_global_metadata_on_event_processing_failure, false,
|
||||
"This configuration is used to global invalidate metadata when "
|
||||
"invalidate_metadata_on_event_processing_failure cannot invalidate metadata for "
|
||||
"table(s). The default value is false. When enabled, global invalidate metadata is "
|
||||
"performed automatically. Otherwise, failure can put metastore event processor in "
|
||||
"non-active state.");
|
||||
|
||||
DEFINE_string_hidden(inject_process_event_failure_event_types, "",
|
||||
"This configuration is used to inject event processing failure for an event type "
|
||||
"randomly. It is used for debugging purpose. Empty string indicates no failure "
|
||||
"injection and it is default behavior. Valid values are comma separated event types "
|
||||
"as specified in MetastoreEventType enum. This config is only for testing purpose "
|
||||
"and it should not be set in production environments.");
|
||||
|
||||
DEFINE_double_hidden(inject_process_event_failure_ratio, 1.0,
|
||||
"This configuration is used in conjunction with the config "
|
||||
"'inject_process_event_failure_event_types', to define what is the ratio of an"
|
||||
"event failure. If the generated random number is lesser than this value, then we"
|
||||
"fail the event processor(EP).");
|
||||
|
||||
DECLARE_string(state_store_host);
|
||||
DECLARE_int32(state_store_port);
|
||||
DECLARE_string(state_store_2_host);
|
||||
|
||||
@@ -117,6 +117,10 @@ DECLARE_string(hostname);
|
||||
DECLARE_bool(allow_catalog_cache_op_from_masked_users);
|
||||
DECLARE_int32(topic_update_log_gc_frequency);
|
||||
DECLARE_string(debug_actions);
|
||||
DECLARE_bool(invalidate_metadata_on_event_processing_failure);
|
||||
DECLARE_bool(invalidate_global_metadata_on_event_processing_failure);
|
||||
DECLARE_string(inject_process_event_failure_event_types);
|
||||
DECLARE_double(inject_process_event_failure_ratio);
|
||||
|
||||
// HS2 SAML2.0 configuration
|
||||
// Defined here because TAG_FLAG caused issues in global-flags.cc
|
||||
@@ -455,6 +459,13 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
|
||||
FLAGS_allow_catalog_cache_op_from_masked_users);
|
||||
cfg.__set_topic_update_log_gc_frequency(FLAGS_topic_update_log_gc_frequency);
|
||||
cfg.__set_debug_actions(FLAGS_debug_actions);
|
||||
cfg.__set_invalidate_metadata_on_event_processing_failure(
|
||||
FLAGS_invalidate_metadata_on_event_processing_failure);
|
||||
cfg.__set_invalidate_global_metadata_on_event_processing_failure(
|
||||
FLAGS_invalidate_global_metadata_on_event_processing_failure);
|
||||
cfg.__set_inject_process_event_failure_event_types(
|
||||
FLAGS_inject_process_event_failure_event_types);
|
||||
cfg.__set_inject_process_event_failure_ratio(FLAGS_inject_process_event_failure_ratio);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -280,4 +280,12 @@ struct TBackendGflags {
|
||||
124: required i32 topic_update_log_gc_frequency
|
||||
|
||||
125: required string debug_actions
|
||||
|
||||
126: required bool invalidate_metadata_on_event_processing_failure
|
||||
|
||||
127: required bool invalidate_global_metadata_on_event_processing_failure
|
||||
|
||||
128: required string inject_process_event_failure_event_types
|
||||
|
||||
129: required double inject_process_event_failure_ratio
|
||||
}
|
||||
|
||||
@@ -633,6 +633,11 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isEventProcessingDisabled() {
|
||||
return false;
|
||||
|
||||
@@ -830,6 +830,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
|
||||
public static class CommitTxnEvent extends MetastoreEvent {
|
||||
private final CommitTxnMessage commitTxnMessage_;
|
||||
private final long txnId_;
|
||||
private Set<TableWriteId> tableWriteIds_ = Collections.emptySet();
|
||||
|
||||
public CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
|
||||
NotificationEvent event) {
|
||||
@@ -848,6 +849,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
|
||||
// To ensure no memory leaking in case an exception is thrown, we remove entries
|
||||
// at first.
|
||||
Set<TableWriteId> committedWriteIds = catalog_.removeWriteIds(txnId_);
|
||||
tableWriteIds_ = committedWriteIds;
|
||||
// Via getAllWriteEventInfo, we can get data insertion info for transactional tables
|
||||
// even though there are no insert events generated for transactional tables. Note
|
||||
// that we cannot get DDL info from this API.
|
||||
@@ -876,6 +878,26 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
if (!BackendConfig.INSTANCE.isInvalidateMetadataOnEventProcessFailureEnabled()
|
||||
|| !canInvalidateTable(e)) {
|
||||
return false;
|
||||
}
|
||||
errorLog(
|
||||
"Invalidating tables in transaction due to exception during event processing",
|
||||
e);
|
||||
Set<TableName> tableNames =
|
||||
tableWriteIds_.stream()
|
||||
.map(writeId -> new TableName(writeId.getDbName(), writeId.getTblName()))
|
||||
.collect(Collectors.toSet());
|
||||
for (TableName tableName : tableNames) {
|
||||
errorLog("Invalidate table {}.{}", tableName.getDb(), tableName.getTbl());
|
||||
catalog_.invalidateTableIfExists(tableName.getDb(), tableName.getTbl());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void addCommittedWriteIdsToTables(Set<TableWriteId> tableWriteIds)
|
||||
throws CatalogException {
|
||||
for (TableWriteId tableWriteId: tableWriteIds) {
|
||||
|
||||
@@ -332,6 +332,9 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
// Table properties that require file metadata reload
|
||||
private final Set<String> whitelistedTblProperties_;
|
||||
|
||||
// A variable to test expected failed events
|
||||
private final Set<String> failureEventsForTesting_;
|
||||
|
||||
// Total number of dbs, tables and functions in the catalog cache.
|
||||
// Updated in each catalog topic update (getCatalogDelta()).
|
||||
private int numDbs_ = 0;
|
||||
@@ -387,6 +390,13 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
whitelist)) {
|
||||
whitelistedTblProperties_.add(tblProps);
|
||||
}
|
||||
failureEventsForTesting_ = Sets.newHashSet();
|
||||
String failureEvents =
|
||||
BackendConfig.INSTANCE.getProcessEventFailureEventTypes().toUpperCase();
|
||||
for (String tblProps :
|
||||
Splitter.on(',').trimResults().omitEmptyStrings().split(failureEvents)) {
|
||||
failureEventsForTesting_.add(tblProps);
|
||||
}
|
||||
}
|
||||
|
||||
public void startEventsProcessor() {
|
||||
@@ -4046,4 +4056,6 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
public Set<String> getWhitelistedTblProperties() {
|
||||
return whitelistedTblProperties_;
|
||||
}
|
||||
|
||||
public Set<String> getFailureEventsForTesting() { return failureEventsForTesting_; }
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ import org.apache.impala.catalog.Db;
|
||||
import org.apache.impala.catalog.FileMetadataLoadOpts;
|
||||
import org.apache.impala.catalog.HdfsTable;
|
||||
import org.apache.impala.catalog.IncompleteTable;
|
||||
import org.apache.impala.catalog.MetastoreClientInstantiationException;
|
||||
import org.apache.impala.catalog.TableLoadingException;
|
||||
import org.apache.impala.catalog.TableNotFoundException;
|
||||
import org.apache.impala.catalog.TableNotLoadedException;
|
||||
@@ -655,6 +656,19 @@ public class MetastoreEvents {
|
||||
return dbName_ + "." + tblName_;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to inject error randomly for certain events during the processing.
|
||||
* It is used for testing purpose.
|
||||
*/
|
||||
private void injectErrorIfNeeded() {
|
||||
if (catalog_.getFailureEventsForTesting().contains(eventType_.toString())) {
|
||||
double random = Math.random();
|
||||
if (random < BackendConfig.INSTANCE.getProcessEventFailureRatio()) {
|
||||
throw new RuntimeException("Event processing failed due to error injection");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process this event if it is enabled based on the flags on this object
|
||||
*
|
||||
@@ -677,6 +691,7 @@ public class MetastoreEvents {
|
||||
}
|
||||
}
|
||||
process();
|
||||
injectErrorIfNeeded();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -740,6 +755,25 @@ public class MetastoreEvents {
|
||||
protected abstract void process()
|
||||
throws MetastoreNotificationException, CatalogException;
|
||||
|
||||
/**
|
||||
* Process event failure handles the exception occurred during processing.
|
||||
* @param e Exception occurred during process
|
||||
* @return Returns true when failure is handled. Otherwise, false
|
||||
*/
|
||||
protected abstract boolean onFailure(Exception e);
|
||||
|
||||
protected boolean canInvalidateTable(Exception e) {
|
||||
if (e instanceof MetastoreClientInstantiationException) {
|
||||
// All runtime exceptions except this exception are considered for invalidation.
|
||||
return false;
|
||||
}
|
||||
// This RuntimeException covers all the exceptions from
|
||||
// com.google.common.base.Preconditions methods. IllegalStateException is one such
|
||||
// exception seen in IMPALA-12827.
|
||||
return (e instanceof RuntimeException) || (e instanceof CatalogException)
|
||||
|| (e instanceof MetastoreNotificationNeedsInvalidateException);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to get debug string with helpful event information prepended to the
|
||||
* message. This can be used to generate helpful exception messages
|
||||
@@ -818,6 +852,17 @@ public class MetastoreEvents {
|
||||
LOG.warn(formatString, formatArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to log error for an event
|
||||
*/
|
||||
protected void errorLog(String logFormattedStr, Object... args) {
|
||||
if (!LOG.isErrorEnabled()) return;
|
||||
String formatString =
|
||||
new StringBuilder(LOG_FORMAT_EVENT_ID_TYPE).append(logFormattedStr).toString();
|
||||
Object[] formatArgs = getLogFormatArgs(args);
|
||||
LOG.error(formatString, formatArgs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for a inverse event (for example drop_table is a inverse event for
|
||||
* create_table) for this event from a given list of notificationEvents starting for
|
||||
@@ -1261,6 +1306,20 @@ public class MetastoreEvents {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
if (!BackendConfig.INSTANCE.isInvalidateMetadataOnEventProcessFailureEnabled()) {
|
||||
return false;
|
||||
}
|
||||
boolean isInvalidate = canInvalidateTable(e);
|
||||
if (isInvalidate) {
|
||||
errorLog("Invalidate table {}.{} due to exception during event processing",
|
||||
dbName_, tblName_, e);
|
||||
catalog_.invalidateTableIfExists(dbName_, tblName_);
|
||||
}
|
||||
return isInvalidate;
|
||||
}
|
||||
|
||||
protected abstract void processTableEvent() throws MetastoreNotificationException,
|
||||
CatalogException;
|
||||
}
|
||||
@@ -1328,6 +1387,12 @@ public class MetastoreEvents {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
// TODO: Need to check db event failure in future
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1393,6 +1458,11 @@ public class MetastoreEvents {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRemovedAfter(List<MetastoreEvent> events) {
|
||||
Preconditions.checkNotNull(events);
|
||||
@@ -1734,6 +1804,27 @@ public class MetastoreEvents {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
if (!BackendConfig.INSTANCE.isInvalidateMetadataOnEventProcessFailureEnabled()
|
||||
|| !canInvalidateTable(e)) {
|
||||
return false;
|
||||
}
|
||||
if (isRename()) {
|
||||
/* In case of rename table, not invalidating tables due of following reasons:
|
||||
1. When failure happened before old table is removed from catalog,
|
||||
invalidateTableIfExists may trigger table load for a non-existing table later.
|
||||
2. When failure happened before adding new table to catalog,
|
||||
invalidateTableIfExists does not invalidate table
|
||||
*/
|
||||
errorLog(
|
||||
"Rename table {}.{} to {}.{} failed due to exception during event processing",
|
||||
tableBefore_.getDbName(), tableBefore_.getTableName(), dbName_, tblName_, e);
|
||||
return false;
|
||||
}
|
||||
return super.onFailure(e);
|
||||
}
|
||||
|
||||
private void handleEventSyncTurnedOn() throws DatabaseNotFoundException,
|
||||
MetastoreNotificationNeedsInvalidateException {
|
||||
// check if the table exists or not. 1) if the table doesn't exist create an
|
||||
@@ -1975,6 +2066,11 @@ public class MetastoreEvents {
|
||||
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2988,6 +3084,7 @@ public class MetastoreEvents {
|
||||
*/
|
||||
public static class AbortTxnEvent extends MetastoreEvent {
|
||||
private final long txnId_;
|
||||
private Set<TableWriteId> tableWriteIds_ = Collections.emptySet();
|
||||
|
||||
AbortTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
|
||||
NotificationEvent event) {
|
||||
@@ -3004,9 +3101,9 @@ public class MetastoreEvents {
|
||||
@Override
|
||||
protected void process() throws MetastoreNotificationException {
|
||||
try {
|
||||
Set<TableWriteId> tableWriteIds = catalog_.getWriteIds(txnId_);
|
||||
infoLog("Adding {} aborted write ids", tableWriteIds.size());
|
||||
addAbortedWriteIdsToTables(tableWriteIds);
|
||||
tableWriteIds_ = catalog_.getWriteIds(txnId_);
|
||||
infoLog("Adding {} aborted write ids", tableWriteIds_.size());
|
||||
addAbortedWriteIdsToTables(tableWriteIds_);
|
||||
} catch (CatalogException e) {
|
||||
throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
|
||||
+ "mark aborted write ids to table for txn {}. Event processing cannot "
|
||||
@@ -3017,6 +3114,26 @@ public class MetastoreEvents {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
if (!BackendConfig.INSTANCE.isInvalidateMetadataOnEventProcessFailureEnabled()
|
||||
|| !canInvalidateTable(e)) {
|
||||
return false;
|
||||
}
|
||||
errorLog(
|
||||
"Invalidating tables in transaction due to exception during event processing",
|
||||
e);
|
||||
Set<TableName> tableNames =
|
||||
tableWriteIds_.stream()
|
||||
.map(writeId -> new TableName(writeId.getDbName(), writeId.getTblName()))
|
||||
.collect(Collectors.toSet());
|
||||
for (TableName tableName : tableNames) {
|
||||
errorLog("Invalidate table {}.{}", tableName.getDb(), tableName.getTbl());
|
||||
catalog_.invalidateTableIfExists(tableName.getDb(), tableName.getTbl());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void addAbortedWriteIdsToTables(Set<TableWriteId> tableWriteIds)
|
||||
throws CatalogException {
|
||||
for (TableWriteId tableWriteId: tableWriteIds) {
|
||||
@@ -3138,5 +3255,10 @@ public class MetastoreEvents {
|
||||
throw new UnsupportedOperationException("Self-event evaluation is not needed for "
|
||||
+ "this event type");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean onFailure(Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,6 +59,7 @@ import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
|
||||
import org.apache.impala.common.Metrics;
|
||||
import org.apache.impala.common.PrintUtils;
|
||||
import org.apache.impala.compat.MetastoreShim;
|
||||
import org.apache.impala.service.BackendConfig;
|
||||
import org.apache.impala.service.CatalogOpExecutor;
|
||||
import org.apache.impala.thrift.TEventProcessorMetrics;
|
||||
import org.apache.impala.thrift.TEventProcessorMetricsSummaryResponse;
|
||||
@@ -960,6 +961,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
||||
LOG.warn(String.format(
|
||||
"Event processing is skipped since status is %s. Last synced event id is %d",
|
||||
currentStatus, lastSyncedEventId_.get()));
|
||||
tryAutoGlobalInvalidateOnFailure();
|
||||
return;
|
||||
}
|
||||
// fetch the current notification event id. We assume that the polling interval
|
||||
@@ -980,6 +982,7 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
||||
LOG.error(msg, ex);
|
||||
eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' +
|
||||
ExceptionUtils.getFullStackTrace(ex);
|
||||
tryAutoGlobalInvalidateOnFailure();
|
||||
} catch (Exception ex) {
|
||||
// There are lot of Preconditions which can throw RuntimeExceptions when we
|
||||
// process events this catch all exception block is needed so that the scheduler
|
||||
@@ -990,6 +993,28 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
||||
eventProcessorErrorMsg_ = LocalDateTime.now().toString() + '\n' + msg + '\n' +
|
||||
ExceptionUtils.getFullStackTrace(ex);
|
||||
dumpEventInfoToLog(currentEvent_);
|
||||
tryAutoGlobalInvalidateOnFailure();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method does global invalidation when
|
||||
* invalidate_global_metadata_on_event_processing_failure flag is enabled and the
|
||||
* event processor is in error state or need invalidate state
|
||||
*/
|
||||
private void tryAutoGlobalInvalidateOnFailure() {
|
||||
EventProcessorStatus currentStatus = eventProcessorStatus_;
|
||||
if (BackendConfig.INSTANCE.isInvalidateGlobalMetadataOnEventProcessFailureEnabled()
|
||||
&& ((currentStatus == EventProcessorStatus.ERROR)
|
||||
|| (currentStatus == EventProcessorStatus.NEEDS_INVALIDATE))) {
|
||||
try {
|
||||
LOG.error("Triggering auto global invalidation");
|
||||
catalog_.reset();
|
||||
eventProcessorErrorMsg_ = null;
|
||||
} catch (Exception e) {
|
||||
// Catching generic exception so that scheduler thread does not die silently
|
||||
LOG.error("Automatic global invalidate metadata failed", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1163,6 +1188,16 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
||||
event.processIfEnabled();
|
||||
long elapsedTimeMs = System.currentTimeMillis() - startMs;
|
||||
eventProcessingTime.put(event, elapsedTimeMs);
|
||||
} catch (Exception processingEx) {
|
||||
try {
|
||||
if (!event.onFailure(processingEx)) {
|
||||
event.errorLog("Unable to handle event processing failure");
|
||||
throw processingEx;
|
||||
}
|
||||
} catch (Exception onFailureEx) {
|
||||
event.errorLog("Failed to handle event processing failure", onFailureEx);
|
||||
throw processingEx;
|
||||
}
|
||||
}
|
||||
deleteEventLog_.garbageCollect(event.getEventId());
|
||||
lastSyncedEventId_.set(event.getEventId());
|
||||
@@ -1226,7 +1261,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
|
||||
/**
|
||||
* Updates the current states to the given status.
|
||||
*/
|
||||
private synchronized void updateStatus(EventProcessorStatus toStatus) {
|
||||
@VisibleForTesting
|
||||
public synchronized void updateStatus(EventProcessorStatus toStatus) {
|
||||
eventProcessorStatus_ = toStatus;
|
||||
}
|
||||
|
||||
|
||||
@@ -459,4 +459,24 @@ public class BackendConfig {
|
||||
}
|
||||
|
||||
public String debugActions() { return backendCfg_.debug_actions; }
|
||||
|
||||
public boolean isInvalidateMetadataOnEventProcessFailureEnabled() {
|
||||
return backendCfg_.invalidate_metadata_on_event_processing_failure;
|
||||
}
|
||||
|
||||
public boolean isInvalidateGlobalMetadataOnEventProcessFailureEnabled() {
|
||||
return backendCfg_.invalidate_global_metadata_on_event_processing_failure;
|
||||
}
|
||||
|
||||
public void setInvalidateGlobalMetadataOnEventProcessFailure(boolean isEnabled) {
|
||||
backendCfg_.invalidate_global_metadata_on_event_processing_failure = isEnabled;
|
||||
}
|
||||
|
||||
public String getProcessEventFailureEventTypes() {
|
||||
return backendCfg_.inject_process_event_failure_event_types;
|
||||
}
|
||||
|
||||
public double getProcessEventFailureRatio() {
|
||||
return backendCfg_.inject_process_event_failure_ratio;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3718,6 +3718,28 @@ public class MetastoreEventsProcessorTest {
|
||||
.getCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEventProcessorErrorState() throws Exception {
|
||||
boolean prevFlagVal =
|
||||
BackendConfig.INSTANCE.isInvalidateGlobalMetadataOnEventProcessFailureEnabled();
|
||||
try {
|
||||
BackendConfig.INSTANCE.setInvalidateGlobalMetadataOnEventProcessFailure(true);
|
||||
createDatabase(TEST_DB_NAME, null);
|
||||
eventsProcessor_.updateStatus(EventProcessorStatus.NEEDS_INVALIDATE);
|
||||
eventsProcessor_.processEvents();
|
||||
// wait and check EP status
|
||||
assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
|
||||
dropDatabaseCascadeFromHMS();
|
||||
eventsProcessor_.updateStatus(EventProcessorStatus.ERROR);
|
||||
eventsProcessor_.processEvents();
|
||||
// wait and check EP status
|
||||
assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
|
||||
} finally {
|
||||
BackendConfig.INSTANCE.setInvalidateGlobalMetadataOnEventProcessFailure(
|
||||
prevFlagVal);
|
||||
}
|
||||
}
|
||||
|
||||
private void insertIntoTable(String dbName, String tableName) throws Exception {
|
||||
try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
|
||||
org.apache.hadoop.hive.metastore.api.Table msTable =
|
||||
|
||||
@@ -950,6 +950,16 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
assert len(result.data) <= 1, 'Multiple values returned from scalar'
|
||||
return result.data[0] if len(result.data) == 1 else None
|
||||
|
||||
@classmethod
|
||||
@execute_wrapper
|
||||
def execute_scalar_expect_success(cls, impalad_client, query, query_options=None,
|
||||
user=None):
|
||||
"""Executes a query and asserts if the query fails"""
|
||||
result = cls.__execute_query(impalad_client, query, query_options, user)
|
||||
assert result.success
|
||||
assert len(result.data) <= 1, 'Multiple values returned from scalar'
|
||||
return result.data[0] if len(result.data) == 1 else None
|
||||
|
||||
def exec_and_compare_hive_and_impala_hs2(self, stmt, compare = lambda x, y: x == y):
|
||||
"""Compare Hive and Impala results when executing the same statment over HS2"""
|
||||
# execute_using_jdbc expects a Query object. Convert the query string into a Query
|
||||
@@ -1033,13 +1043,15 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
|
||||
# TODO(todd) make this use Thrift to connect to HS2 instead of shelling
|
||||
# out to beeline for better performance
|
||||
def run_stmt_in_hive(self, stmt, username=None):
|
||||
@classmethod
|
||||
def run_stmt_in_hive(cls, stmt, username=None):
|
||||
"""Run a statement in Hive by Beeline."""
|
||||
LOG.info("-- executing in HiveServer2\n\n" + stmt + "\n")
|
||||
url = 'jdbc:hive2://' + pytest.config.option.hive_server2
|
||||
return self.run_stmt_in_beeline(url, username, stmt)
|
||||
return cls.run_stmt_in_beeline(url, username, stmt)
|
||||
|
||||
def run_stmt_in_beeline(self, url, username, stmt):
|
||||
@classmethod
|
||||
def run_stmt_in_beeline(cls, url, username, stmt):
|
||||
"""
|
||||
Run a statement by Beeline, returning stdout if successful and throwing
|
||||
RuntimeError(stderr) if not.
|
||||
|
||||
386
tests/custom_cluster/test_event_processing_error.py
Normal file
386
tests/custom_cluster/test_event_processing_error.py
Normal file
@@ -0,0 +1,386 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import, division, print_function
|
||||
from builtins import range
|
||||
|
||||
from hive_metastore.ttypes import FireEventRequest
|
||||
from hive_metastore.ttypes import FireEventRequestData
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.skip import (SkipIfCatalogV2)
|
||||
from tests.metadata.test_event_processing_base import TestEventProcessingBase
|
||||
from tests.util.acid_txn import AcidTxn
|
||||
from tests.util.event_processor_utils import EventProcessorUtils
|
||||
|
||||
|
||||
@SkipIfCatalogV2.hms_event_polling_disabled()
|
||||
class TestEventProcessingError(CustomClusterTestSuite):
|
||||
"""
|
||||
Tests for verify event processor not going into error state whenever there are
|
||||
runtime exceptions while processing events.
|
||||
"""
|
||||
@classmethod
|
||||
def get_workload(self):
|
||||
return 'functional-query'
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--invalidate_metadata_on_event_processing_failure=false "
|
||||
"--inject_process_event_failure_event_types='ALTER_TABLE' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_sanity_check(self, unique_database):
|
||||
"""Tests event processor going into error state for alter table event"""
|
||||
tbl_name = "hive_alter_table"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, False)
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} set owner user `test-user`"
|
||||
.format(unique_database, tbl_name))
|
||||
try:
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
except Exception:
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ERROR"
|
||||
self.client.execute("INVALIDATE METADATA")
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='ALTER_TABLE' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_alter_table(self, unique_database):
|
||||
"""Tests event processor going into error state for alter table event"""
|
||||
tbl_name = "hive_alter_table"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, False)
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} set owner user `test-user`"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("describe formatted {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
self.verify_owner_property(result, 'test-user')
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='ADD_PARTITION' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_add_partition(self, unique_database):
|
||||
"""Tests event processor going into error state for add partition event"""
|
||||
tbl_name = "hive_table_add_partition"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, True)
|
||||
self.client.execute("describe {}.{}".format(unique_database, tbl_name))
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} add partition(year=2024)"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("show partitions {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
# First line is the header. Only one partition should be shown so the
|
||||
# result has two lines.
|
||||
assert "hive_table_add_partition/year=2024" in result.get_data()
|
||||
assert len(result.data) == 2
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='ALTER_PARTITION' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_alter_partition(self, unique_database):
|
||||
"""Tests event processor going into error state for alter partition event"""
|
||||
tbl_name = "hive_table_alter_partition"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, True)
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} add partition(year=2024)"
|
||||
.format(unique_database, tbl_name))
|
||||
self.run_stmt_in_hive(
|
||||
"analyze table {}.{} compute statistics"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("show partitions {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
assert "2024" in result.get_data()
|
||||
assert len(result.data) == 2
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='ALTER_PARTITIONS' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_alter_partitions(self, unique_database):
|
||||
"""Tests event processor going into error state for batch alter partitions event"""
|
||||
tbl_name = "hive_table_alter_partitions"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, True)
|
||||
for i in range(5):
|
||||
self.client.execute(
|
||||
"alter table {}.{} add partition(year={})"
|
||||
.format(unique_database, tbl_name, i))
|
||||
self.run_stmt_in_hive(
|
||||
"analyze table {}.{} compute statistics"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("show partitions {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
for i in range(5):
|
||||
assert str(i) in result.get_data()
|
||||
assert len(result.data) == 6
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='INSERT' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_insert_event(self, unique_database):
|
||||
"""Tests event processor going into error state for insert event"""
|
||||
tbl_name = "hive_table_insert"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, True)
|
||||
for _ in range(2):
|
||||
self.client.execute(
|
||||
"insert into {}.{} partition(year=2024) values (1),(2),(3)"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("select count(*) from {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
assert result.data[0] == '6'
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='INSERT_PARTITIONS' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_insert_events(self, unique_database):
|
||||
"""Tests event processor going into error state for insert partitions event"""
|
||||
tbl_name = "hive_table_insert_partitions"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, True)
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} add partition(year=2024)"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
for _ in range(2):
|
||||
self.client.execute(
|
||||
"insert into {}.{} partition(year=2024) values (1),(2),(3)"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("select count(*) from {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
assert result.data[0] == '6'
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='DROP_PARTITION' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_drop_partition(self, unique_database):
|
||||
"""Tests event processor going into error state for drop partitions event"""
|
||||
tbl_name = "hive_table_drop_partition"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, True)
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} add partition(year=2024)"
|
||||
.format(unique_database, tbl_name))
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} drop partition(year=2024)"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("show partitions {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
assert "2024" not in result.get_data()
|
||||
assert len(result.data) == 1
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='RELOAD' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_reload_event(self, unique_database):
|
||||
"""Tests event processor going into error state for reload event"""
|
||||
tbl_name = "hive_table_reload_event"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, False, True)
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} add partition(year=2024)"
|
||||
.format(unique_database, tbl_name))
|
||||
# Refresh at table level
|
||||
data = FireEventRequestData()
|
||||
data.refreshEvent = True
|
||||
req = FireEventRequest(True, data)
|
||||
req.dbName = unique_database
|
||||
req.tableName = tbl_name
|
||||
self.hive_client.fire_listener_event(req)
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# refresh at partition level
|
||||
req.partitionVals = ["2024"]
|
||||
self.hive_client.fire_listener_event(req)
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# invalidate at table level
|
||||
data.refreshEvent = False
|
||||
req = FireEventRequest(True, data)
|
||||
req.dbName = unique_database
|
||||
req.tableName = tbl_name
|
||||
self.hive_client.fire_listener_event(req)
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
self.client.execute("describe formatted {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types="
|
||||
"'COMMIT_COMPACTION_EVENT' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_commit_compaction_event(self, unique_database):
|
||||
"""Tests event processor going into error state for commit compaction event"""
|
||||
tbl_name = "hive_table_commit_compaction"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, True, False)
|
||||
for _ in range(2):
|
||||
self.run_stmt_in_hive(
|
||||
"insert into {}.{} values (1),(2),(3)"
|
||||
.format(unique_database, tbl_name))
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} compact 'minor' and wait"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
self.client.execute("describe formatted {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='ALLOC_WRITE_ID_EVENT' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_alloc_write_id_event(self, unique_database):
|
||||
tbl_name = "hive_table_alloc_write_id"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, True, True)
|
||||
acid = AcidTxn(self.hive_client)
|
||||
txn_id = acid.open_txns()
|
||||
acid.allocate_table_write_ids(txn_id, unique_database, tbl_name)
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
self.client.execute("describe formatted {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='COMMIT_TXN' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_commit_txn(self, unique_database):
|
||||
tbl_name = "hive_table_commit_txn"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, True, True)
|
||||
self.run_stmt_in_hive(
|
||||
"insert into {}.{} partition (year=2022) values (1),(2),(3)"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
self.client.execute("describe formatted {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_event_types='ABORT_TXN' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_abort_txn_event(self, unique_database):
|
||||
tbl_name = "hive_table_abort_txn"
|
||||
acid = AcidTxn(self.hive_client)
|
||||
self.__create_table_and_load__(unique_database, tbl_name, True, True)
|
||||
txn_id = acid.open_txns()
|
||||
acid.abort_txn(txn_id)
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
self.client.execute("describe formatted {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--invalidate_metadata_on_event_processing_failure=false "
|
||||
"--invalidate_global_metadata_on_event_processing_failure=true "
|
||||
"--inject_process_event_failure_event_types="
|
||||
"'ALTER_TABLE, ADD_PARTITION' "
|
||||
"--hms_event_polling_interval_s=2")
|
||||
def test_event_processor_error_global_invalidate(self, unique_database):
|
||||
"""Test to verify that auto global invalidate put back EP to active
|
||||
when it goes into error state"""
|
||||
tbl_name = "hive_table_global_invalidate"
|
||||
self.__create_table_and_load__(unique_database, tbl_name, True, True)
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} set owner user `test-user`"
|
||||
.format(unique_database, tbl_name))
|
||||
self.run_stmt_in_hive(
|
||||
"alter table {}.{} add partition(year=2024)"
|
||||
.format(unique_database, tbl_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
result = self.client.execute("describe formatted {}.{}"
|
||||
.format(unique_database, tbl_name))
|
||||
self.verify_owner_property(result, 'test-user')
|
||||
assert "2024" in result.get_data()
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
impalad_args="--use_local_catalog=true",
|
||||
catalogd_args="--catalog_topic_mode=minimal "
|
||||
"--inject_process_event_failure_ratio=0.5 "
|
||||
"--inject_process_event_failure_event_types="
|
||||
"'ALTER_TABLE,ADD_PARTITION,"
|
||||
"ALTER_PARTITION,INSERT,ABORT_TXN,COMMIT_TXN'")
|
||||
def test_event_processor_error_stress_test(self, unique_database):
|
||||
"""Executes inserts for transactional tables and external tables. Also runs
|
||||
replication tests
|
||||
"""
|
||||
# inserts on transactional tables
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client,
|
||||
self.cluster, unique_database, True)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
try:
|
||||
test_db = unique_database + "_no_transact"
|
||||
self.run_stmt_in_hive("""create database {}""".format(test_db))
|
||||
# inserts on external tables
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self.hive_client,
|
||||
self.client, self.cluster, test_db, False)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
finally:
|
||||
self.run_stmt_in_hive("""drop database {} cascade""".format(test_db))
|
||||
# replication related tests
|
||||
TestEventProcessingBase._run_event_based_replication_tests_impl(self.hive_client,
|
||||
self.client, self.cluster, self.filesystem_client)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
|
||||
def __create_table_and_load__(self, db_name, table_name, is_transactional,
|
||||
is_partitioned):
|
||||
create_query = " ".join(["create", " transactional " if is_transactional else '',
|
||||
"table `{}`.`{}`(i int)", " partitioned by (year int) " if is_partitioned else ''])
|
||||
self.run_stmt_in_hive(create_query.format(db_name, table_name))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Make the table loaded
|
||||
self.client.execute("describe {}.{}".format(db_name, table_name))
|
||||
|
||||
@staticmethod
|
||||
def verify_owner_property(result, user_name):
|
||||
match = False
|
||||
for row in result.data:
|
||||
fields = row.split("\t")
|
||||
if "Owner:" in fields[0]:
|
||||
assert user_name == fields[1].strip()
|
||||
match = True
|
||||
assert True, match
|
||||
1
tests/metadata/__init__.py
Normal file
1
tests/metadata/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# This file is needed to make the files in this directory a python module
|
||||
@@ -25,7 +25,7 @@ import time
|
||||
|
||||
from beeswaxd.BeeswaxService import QueryState
|
||||
from copy import deepcopy
|
||||
from test_ddl_base import TestDdlBase
|
||||
from tests.metadata.test_ddl_base import TestDdlBase
|
||||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||||
from tests.common.environ import (HIVE_MAJOR_VERSION)
|
||||
from tests.common.file_utils import create_table_from_orc
|
||||
|
||||
@@ -18,8 +18,10 @@ from __future__ import absolute_import, division, print_function
|
||||
from subprocess import check_call
|
||||
import pytest
|
||||
|
||||
from tests.common.impala_cluster import ImpalaCluster
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2
|
||||
from tests.metadata.test_event_processing_base import TestEventProcessingBase
|
||||
from tests.util.event_processor_utils import EventProcessorUtils
|
||||
|
||||
|
||||
@@ -35,120 +37,14 @@ class TestEventProcessing(ImpalaTestSuite):
|
||||
def test_transactional_insert_events(self, unique_database):
|
||||
"""Executes 'run_test_insert_events' for transactional tables.
|
||||
"""
|
||||
self.run_test_insert_events(unique_database, is_transactional=True)
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client,
|
||||
ImpalaCluster.get_e2e_test_cluster(), unique_database, is_transactional=True)
|
||||
|
||||
def test_insert_events(self, unique_database):
|
||||
"""Executes 'run_test_insert_events' for non-transactional tables.
|
||||
"""
|
||||
self.run_test_insert_events(unique_database)
|
||||
|
||||
def run_test_insert_events(self, unique_database, is_transactional=False):
|
||||
"""Test for insert event processing. Events are created in Hive and processed in
|
||||
Impala. The following cases are tested :
|
||||
Insert into table --> for partitioned and non-partitioned table
|
||||
Insert overwrite table --> for partitioned and non-partitioned table
|
||||
Insert into partition --> for partitioned table
|
||||
"""
|
||||
# Test table with no partitions.
|
||||
tbl_insert_nopart = 'tbl_insert_nopart'
|
||||
self.run_stmt_in_hive(
|
||||
"drop table if exists %s.%s" % (unique_database, tbl_insert_nopart))
|
||||
tblproperties = ""
|
||||
if is_transactional:
|
||||
tblproperties = "tblproperties ('transactional'='true'," \
|
||||
"'transactional_properties'='insert_only')"
|
||||
self.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
|
||||
% (unique_database, tbl_insert_nopart, tblproperties))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Test CTAS and insert by Impala with empty results (IMPALA-10765).
|
||||
self.execute_query("create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}"
|
||||
.format(db=unique_database, tbl=tbl_insert_nopart, prop=tblproperties))
|
||||
self.execute_query("insert into {db}.ctas_tbl select * from {db}.{tbl}"
|
||||
.format(db=unique_database, tbl=tbl_insert_nopart))
|
||||
# Test insert into table, this will fire an insert event.
|
||||
self.run_stmt_in_hive("insert into %s.%s values(101, 200)"
|
||||
% (unique_database, tbl_insert_nopart))
|
||||
# With MetastoreEventProcessor running, the insert event will be processed. Query the
|
||||
# table from Impala.
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Verify that the data is present in Impala.
|
||||
data = self.execute_scalar("select * from %s.%s" %
|
||||
(unique_database, tbl_insert_nopart))
|
||||
assert data.split('\t') == ['101', '200']
|
||||
|
||||
# Test insert overwrite. Overwrite the existing value.
|
||||
self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
|
||||
% (unique_database, tbl_insert_nopart))
|
||||
# Make sure the event has been processed.
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Verify that the data is present in Impala.
|
||||
data = self.execute_scalar("select * from %s.%s" %
|
||||
(unique_database, tbl_insert_nopart))
|
||||
assert data.split('\t') == ['101', '201']
|
||||
# Test insert overwrite by Impala with empty results (IMPALA-10765).
|
||||
self.execute_query("insert overwrite {db}.{tbl} select * from {db}.ctas_tbl"
|
||||
.format(db=unique_database, tbl=tbl_insert_nopart))
|
||||
result = self.execute_query("select * from {db}.{tbl}"
|
||||
.format(db=unique_database, tbl=tbl_insert_nopart))
|
||||
assert len(result.data) == 0
|
||||
|
||||
# Test partitioned table.
|
||||
tbl_insert_part = 'tbl_insert_part'
|
||||
self.run_stmt_in_hive("drop table if exists %s.%s"
|
||||
% (unique_database, tbl_insert_part))
|
||||
self.run_stmt_in_hive("create table %s.%s (id int, name string) "
|
||||
"partitioned by(day int, month int, year int) %s"
|
||||
% (unique_database, tbl_insert_part, tblproperties))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Test insert overwrite by Impala with empty results (IMPALA-10765).
|
||||
self.execute_query(
|
||||
"create table {db}.ctas_part partitioned by (day, month, year) {prop} as "
|
||||
"select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part,
|
||||
prop=tblproperties))
|
||||
self.execute_query(
|
||||
"insert into {db}.ctas_part partition(day=0, month=0, year=0) select id, "
|
||||
"name from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part))
|
||||
# Insert data into partitions.
|
||||
self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
|
||||
"values(101, 'x')" % (unique_database, tbl_insert_part))
|
||||
# Make sure the event has been processed.
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Verify that the data is present in Impala.
|
||||
data = self.execute_scalar("select * from %s.%s" % (unique_database, tbl_insert_part))
|
||||
assert data.split('\t') == ['101', 'x', '28', '3', '2019']
|
||||
|
||||
# Test inserting into existing partitions.
|
||||
self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)"
|
||||
"values(102, 'y')" % (unique_database, tbl_insert_part))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Verify that the data is present in Impala.
|
||||
data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 "
|
||||
"and year=2019" % (unique_database, tbl_insert_part))
|
||||
assert data.split('\t') == ['2']
|
||||
# Test inserting into existing partitions by Impala with empty results
|
||||
# (IMPALA-10765).
|
||||
self.execute_query("insert into {db}.{tbl} partition(day=28, month=03, year=2019) "
|
||||
"select id, name from {db}.ctas_part"
|
||||
.format(db=unique_database, tbl=tbl_insert_part))
|
||||
|
||||
# Test insert overwrite into existing partitions
|
||||
self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, "
|
||||
"year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# Verify that the data is present in Impala.
|
||||
data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and"
|
||||
" year=2019 and id=101" % (unique_database, tbl_insert_part))
|
||||
assert data.split('\t') == ['101', 'z', '28', '3', '2019']
|
||||
# Test insert overwrite into existing partitions by Impala with empty results
|
||||
# (IMPALA-10765).
|
||||
self.execute_query("insert overwrite {db}.{tbl} "
|
||||
"partition(day=28, month=03, year=2019) "
|
||||
"select id, name from {db}.ctas_part"
|
||||
.format(db=unique_database, tbl=tbl_insert_part))
|
||||
result = self.execute_query("select * from {db}.{tbl} "
|
||||
"where day=28 and month=3 and year=2019"
|
||||
.format(db=unique_database, tbl=tbl_insert_part))
|
||||
assert len(result.data) == 0
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self.hive_client, self.client,
|
||||
ImpalaCluster.get_e2e_test_cluster(), unique_database)
|
||||
|
||||
def test_iceberg_inserts(self):
|
||||
"""IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation
|
||||
@@ -177,180 +73,13 @@ class TestEventProcessing(ImpalaTestSuite):
|
||||
self._run_test_empty_partition_events(unique_database, False)
|
||||
|
||||
def test_event_based_replication(self):
|
||||
self.__run_event_based_replication_tests()
|
||||
|
||||
def __run_event_based_replication_tests(self, transactional=True):
|
||||
"""Hive Replication relies on the insert events generated on the tables.
|
||||
This test issues some basic replication commands from Hive and makes sure
|
||||
that the replicated table has correct data."""
|
||||
TBLPROPERTIES = self.__get_transactional_tblproperties(transactional)
|
||||
source_db = ImpalaTestSuite.get_random_name("repl_source_")
|
||||
target_db = ImpalaTestSuite.get_random_name("repl_target_")
|
||||
unpartitioned_tbl = "unpart_tbl"
|
||||
partitioned_tbl = "part_tbl"
|
||||
try:
|
||||
self.run_stmt_in_hive("create database {0}".format(source_db))
|
||||
self.run_stmt_in_hive(
|
||||
"alter database {0} set dbproperties ('repl.source.for'='xyz')".format(source_db))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# explicit create table command since create table like doesn't allow tblproperties
|
||||
self.client.execute("create table {0}.{1} (a string, b string) stored as parquet"
|
||||
" {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES))
|
||||
self.client.execute(
|
||||
"create table {0}.{1} (id int, bool_col boolean, tinyint_col tinyint, "
|
||||
"smallint_col smallint, int_col int, bigint_col bigint, float_col float, "
|
||||
"double_col double, date_string string, string_col string, "
|
||||
"timestamp_col timestamp) partitioned by (year int, month int) stored as parquet"
|
||||
" {2}".format(source_db, partitioned_tbl, TBLPROPERTIES))
|
||||
|
||||
# case I: insert
|
||||
# load the table with some data from impala, this also creates new partitions.
|
||||
self.client.execute("insert into {0}.{1}"
|
||||
" select * from functional.tinytable".format(source_db,
|
||||
unpartitioned_tbl))
|
||||
self.client.execute("insert into {0}.{1} partition(year,month)"
|
||||
" select * from functional_parquet.alltypessmall".format(
|
||||
source_db, partitioned_tbl))
|
||||
rows_in_unpart_tbl = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(source_db, unpartitioned_tbl)).split('\t')[
|
||||
0])
|
||||
rows_in_part_tbl = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(source_db, partitioned_tbl)).split('\t')[0])
|
||||
assert rows_in_unpart_tbl > 0
|
||||
assert rows_in_part_tbl > 0
|
||||
# bootstrap the replication
|
||||
self.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# create a target database where tables will be replicated
|
||||
self.client.execute("create database {0}".format(target_db))
|
||||
# replicate the table from source to target
|
||||
self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
assert unpartitioned_tbl in self.client.execute(
|
||||
"show tables in {0}".format(target_db)).get_data()
|
||||
assert partitioned_tbl in self.client.execute(
|
||||
"show tables in {0}".format(target_db)).get_data()
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
||||
.split('\t')[0])
|
||||
assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
||||
assert rows_in_part_tbl == rows_in_part_tbl_target
|
||||
|
||||
# case II: insert into existing partitions.
|
||||
self.client.execute("insert into {0}.{1}"
|
||||
" select * from functional.tinytable".format(
|
||||
source_db, unpartitioned_tbl))
|
||||
self.client.execute("insert into {0}.{1} partition(year,month)"
|
||||
" select * from functional_parquet.alltypessmall".format(
|
||||
source_db, partitioned_tbl))
|
||||
self.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0])
|
||||
assert 2 * rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
||||
assert 2 * rows_in_part_tbl == rows_in_part_tbl_target
|
||||
|
||||
# Case III: insert overwrite
|
||||
# impala does a insert overwrite of the tables.
|
||||
self.client.execute("insert overwrite table {0}.{1}"
|
||||
" select * from functional.tinytable".format(
|
||||
source_db, unpartitioned_tbl))
|
||||
self.client.execute("insert overwrite table {0}.{1} partition(year,month)"
|
||||
" select * from functional_parquet.alltypessmall".format(
|
||||
source_db, partitioned_tbl))
|
||||
self.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0])
|
||||
assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
||||
assert rows_in_part_tbl == rows_in_part_tbl_target
|
||||
|
||||
# Case IV: CTAS which creates a transactional table.
|
||||
self.client.execute(
|
||||
"create table {0}.insertonly_nopart_ctas {1} as "
|
||||
"select * from {0}.{2}".format(source_db, TBLPROPERTIES, unpartitioned_tbl))
|
||||
self.client.execute(
|
||||
"create table {0}.insertonly_part_ctas partitioned by (year, month) {1}"
|
||||
" as select * from {0}.{2}".format(source_db, TBLPROPERTIES, partitioned_tbl))
|
||||
self.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_source = int(self.execute_scalar("select count(*) from "
|
||||
"{0}.insertonly_nopart_ctas".format(source_db)).split('\t')[0])
|
||||
rows_in_unpart_tbl_target = int(self.execute_scalar("select count(*) from "
|
||||
"{0}.insertonly_nopart_ctas".format(target_db)).split('\t')[0])
|
||||
assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
|
||||
rows_in_unpart_tbl_source = int(self.execute_scalar("select count(*) from "
|
||||
"{0}.insertonly_part_ctas".format(source_db)).split('\t')[0])
|
||||
rows_in_unpart_tbl_target = int(self.execute_scalar("select count(*) from "
|
||||
"{0}.insertonly_part_ctas".format(target_db)).split('\t')[0])
|
||||
assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
|
||||
|
||||
# Case V: truncate table
|
||||
# impala truncates both the tables. Make sure replication sees that.
|
||||
self.client.execute("truncate table {0}.{1}".format(source_db, unpartitioned_tbl))
|
||||
self.client.execute("truncate table {0}.{1}".format(source_db, partitioned_tbl))
|
||||
self.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(self.execute_scalar(
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0])
|
||||
assert rows_in_unpart_tbl_target == 0
|
||||
assert rows_in_part_tbl_target == 0
|
||||
finally:
|
||||
src_db = self.__get_db_nothrow(source_db)
|
||||
target_db_obj = self.__get_db_nothrow(target_db)
|
||||
if src_db is not None:
|
||||
self.run_stmt_in_hive(
|
||||
"alter database {0} set dbproperties ('repl.source.for'='')".format(source_db))
|
||||
self.run_stmt_in_hive("drop database if exists {0} cascade".format(source_db))
|
||||
if target_db_obj is not None:
|
||||
self.run_stmt_in_hive("drop database if exists {0} cascade".format(target_db))
|
||||
# workaround for HIVE-24135. the managed db location doesn't get cleaned up
|
||||
if src_db is not None and src_db.managedLocationUri is not None:
|
||||
self.filesystem_client.delete_file_dir(src_db.managedLocationUri, True)
|
||||
if target_db_obj is not None and target_db_obj.managedLocationUri is not None:
|
||||
self.filesystem_client.delete_file_dir(target_db_obj.managedLocationUri, True)
|
||||
|
||||
def __get_db_nothrow(self, name):
|
||||
try:
|
||||
return self.hive_client.get_database(name)
|
||||
except Exception:
|
||||
return None
|
||||
TestEventProcessingBase._run_event_based_replication_tests_impl(self.hive_client,
|
||||
self.client, ImpalaCluster.get_e2e_test_cluster(), self.filesystem_client)
|
||||
|
||||
def _run_test_empty_partition_events(self, unique_database, is_transactional):
|
||||
test_tbl = unique_database + ".test_events"
|
||||
TBLPROPERTIES = self.__get_transactional_tblproperties(is_transactional)
|
||||
TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties(
|
||||
is_transactional)
|
||||
self.run_stmt_in_hive("create table {0} (key string, value string) \
|
||||
partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES))
|
||||
EventProcessorUtils.wait_for_event_processing(self)
|
||||
@@ -444,7 +173,8 @@ class TestEventProcessing(ImpalaTestSuite):
|
||||
def run_test_partition_location_change_from_hive(self, unique_database, tbl_name,
|
||||
is_transactional=False):
|
||||
fq_tbl_name = unique_database + "." + tbl_name
|
||||
TBLPROPERTIES = self.__get_transactional_tblproperties(is_transactional)
|
||||
TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties(
|
||||
is_transactional)
|
||||
# Create the table
|
||||
self.client.execute(
|
||||
"create table %s (i int) partitioned by(j int) stored as parquet %s"
|
||||
@@ -475,13 +205,3 @@ class TestEventProcessing(ImpalaTestSuite):
|
||||
break
|
||||
return row[1].rstrip()
|
||||
return None
|
||||
|
||||
def __get_transactional_tblproperties(self, is_transactional):
|
||||
"""
|
||||
Util method to generate the tblproperties for transactional tables
|
||||
"""
|
||||
tblproperties = ""
|
||||
if is_transactional:
|
||||
tblproperties = "tblproperties ('transactional'='true'," \
|
||||
"'transactional_properties'='insert_only')"
|
||||
return tblproperties
|
||||
|
||||
348
tests/metadata/test_event_processing_base.py
Normal file
348
tests/metadata/test_event_processing_base.py
Normal file
@@ -0,0 +1,348 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import, division, print_function
|
||||
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import SkipIfFS, SkipIfCatalogV2
|
||||
from tests.util.event_processor_utils import EventProcessorUtils
|
||||
|
||||
|
||||
@SkipIfFS.hive
|
||||
@SkipIfCatalogV2.hms_event_polling_disabled()
|
||||
class TestEventProcessingBase(ImpalaTestSuite):
|
||||
|
||||
@classmethod
|
||||
def _run_test_insert_events_impl(cls, hive_client, impala_client, impala_cluster,
|
||||
unique_database, is_transactional=False):
|
||||
"""Test for insert event processing. Events are created in Hive and processed in
|
||||
Impala. The following cases are tested :
|
||||
Insert into table --> for partitioned and non-partitioned table
|
||||
Insert overwrite table --> for partitioned and non-partitioned table
|
||||
Insert into partition --> for partitioned table
|
||||
"""
|
||||
# Test table with no partitions.
|
||||
tbl_insert_nopart = 'tbl_insert_nopart'
|
||||
cls.run_stmt_in_hive(
|
||||
"drop table if exists %s.%s" % (unique_database, tbl_insert_nopart))
|
||||
tblproperties = ""
|
||||
if is_transactional:
|
||||
tblproperties = "tblproperties ('transactional'='true'," \
|
||||
"'transactional_properties'='insert_only')"
|
||||
cls.run_stmt_in_hive("create table %s.%s (id int, val int) %s"
|
||||
% (unique_database, tbl_insert_nopart, tblproperties))
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# Test CTAS and insert by Impala with empty results (IMPALA-10765).
|
||||
cls.execute_query_expect_success(impala_client,
|
||||
"create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}"
|
||||
.format(db=unique_database, tbl=tbl_insert_nopart, prop=tblproperties))
|
||||
cls.execute_query_expect_success(impala_client,
|
||||
"insert into {db}.ctas_tbl select * from {db}.{tbl}"
|
||||
.format(db=unique_database, tbl=tbl_insert_nopart))
|
||||
# Test insert into table, this will fire an insert event.
|
||||
cls.run_stmt_in_hive("insert into %s.%s values(101, 200)"
|
||||
% (unique_database, tbl_insert_nopart))
|
||||
# With MetastoreEventProcessor running, the insert event will be processed. Query the
|
||||
# table from Impala.
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# Verify that the data is present in Impala.
|
||||
data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" %
|
||||
(unique_database, tbl_insert_nopart))
|
||||
assert data.split('\t') == ['101', '200']
|
||||
|
||||
# Test insert overwrite. Overwrite the existing value.
|
||||
cls.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)"
|
||||
% (unique_database, tbl_insert_nopart))
|
||||
# Make sure the event has been processed.
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# Verify that the data is present in Impala.
|
||||
data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" %
|
||||
(unique_database, tbl_insert_nopart))
|
||||
assert data.split('\t') == ['101', '201']
|
||||
# Test insert overwrite by Impala with empty results (IMPALA-10765).
|
||||
cls.execute_query_expect_success(impala_client,
|
||||
"insert overwrite {db}.{tbl} select * from {db}.ctas_tbl"
|
||||
.format(db=unique_database, tbl=tbl_insert_nopart))
|
||||
result = cls.execute_query_expect_success(impala_client,
|
||||
"select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_nopart))
|
||||
assert len(result.data) == 0
|
||||
|
||||
# Test partitioned table.
|
||||
tbl_insert_part = 'tbl_insert_part'
|
||||
cls.run_stmt_in_hive("drop table if exists %s.%s"
|
||||
% (unique_database, tbl_insert_part))
|
||||
cls.run_stmt_in_hive("create table %s.%s (id int, name string) "
|
||||
"partitioned by(day int, month int, year int) %s"
|
||||
% (unique_database, tbl_insert_part, tblproperties))
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# Test insert overwrite by Impala with empty results (IMPALA-10765).
|
||||
cls.execute_query_expect_success(impala_client,
|
||||
"create table {db}.ctas_part partitioned by (day, month, year) {prop} as "
|
||||
"select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part,
|
||||
prop=tblproperties))
|
||||
cls.execute_query_expect_success(impala_client,
|
||||
"insert into {db}.ctas_part partition(day=0, month=0, year=0) select id, "
|
||||
"name from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part))
|
||||
# Insert data into partitions.
|
||||
cls.run_stmt_in_hive(
|
||||
"insert into %s.%s partition(day=28, month=03, year=2019)"
|
||||
"values(101, 'x')" % (unique_database, tbl_insert_part))
|
||||
# Make sure the event has been processed.
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# Verify that the data is present in Impala.
|
||||
data = cls.execute_scalar_expect_success(impala_client,
|
||||
"select * from %s.%s" % (unique_database, tbl_insert_part))
|
||||
assert data.split('\t') == ['101', 'x', '28', '3', '2019']
|
||||
|
||||
# Test inserting into existing partitions.
|
||||
cls.run_stmt_in_hive(
|
||||
"insert into %s.%s partition(day=28, month=03, year=2019)"
|
||||
"values(102, 'y')" % (unique_database, tbl_insert_part))
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# Verify that the data is present in Impala.
|
||||
data = cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from %s.%s where day=28 and month=3 "
|
||||
"and year=2019" % (unique_database, tbl_insert_part))
|
||||
assert data.split('\t') == ['2']
|
||||
# Test inserting into existing partitions by Impala with empty results
|
||||
# (IMPALA-10765).
|
||||
cls.execute_query_expect_success(impala_client,
|
||||
"insert into {db}.{tbl} partition(day=28, month=03, year=2019) "
|
||||
"select id, name from {db}.ctas_part"
|
||||
.format(db=unique_database, tbl=tbl_insert_part))
|
||||
|
||||
# Test insert overwrite into existing partitions
|
||||
cls.run_stmt_in_hive(
|
||||
"insert overwrite table %s.%s partition(day=28, month=03, "
|
||||
"year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part))
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# Verify that the data is present in Impala.
|
||||
data = cls.execute_scalar_expect_success(impala_client,
|
||||
"select * from %s.%s where day=28 and month=3 and"
|
||||
" year=2019 and id=101" % (unique_database, tbl_insert_part))
|
||||
assert data.split('\t') == ['101', 'z', '28', '3', '2019']
|
||||
# Test insert overwrite into existing partitions by Impala with empty results
|
||||
# (IMPALA-10765).
|
||||
cls.execute_query_expect_success(impala_client, "insert overwrite {db}.{tbl} "
|
||||
"partition(day=28, month=03, year=2019) "
|
||||
"select id, name from {db}.ctas_part"
|
||||
.format(db=unique_database, tbl=tbl_insert_part))
|
||||
result = cls.execute_query_expect_success(impala_client, "select * from {db}.{tbl} "
|
||||
"where day=28 and month=3 and year=2019"
|
||||
.format(db=unique_database, tbl=tbl_insert_part))
|
||||
assert len(result.data) == 0
|
||||
|
||||
@classmethod
|
||||
def _run_event_based_replication_tests_impl(cls, hive_client, impala_client,
|
||||
impala_cluster, filesystem_client, transactional=True):
|
||||
"""Hive Replication relies on the insert events generated on the tables.
|
||||
This test issues some basic replication commands from Hive and makes sure
|
||||
that the replicated table has correct data."""
|
||||
TBLPROPERTIES = cls._get_transactional_tblproperties(transactional)
|
||||
source_db = ImpalaTestSuite.get_random_name("repl_source_")
|
||||
target_db = ImpalaTestSuite.get_random_name("repl_target_")
|
||||
unpartitioned_tbl = "unpart_tbl"
|
||||
partitioned_tbl = "part_tbl"
|
||||
try:
|
||||
cls.run_stmt_in_hive("create database {0}".format(source_db))
|
||||
cls.run_stmt_in_hive(
|
||||
"alter database {0} set dbproperties ('repl.source.for'='xyz')"
|
||||
.format(source_db))
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# explicit create table command since create table like doesn't allow tblproperties
|
||||
impala_client.execute("create table {0}.{1} (a string, b string) stored as parquet"
|
||||
" {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES))
|
||||
impala_client.execute(
|
||||
"create table {0}.{1} (id int, bool_col boolean, tinyint_col tinyint, "
|
||||
"smallint_col smallint, int_col int, bigint_col bigint, float_col float, "
|
||||
"double_col double, date_string string, string_col string, "
|
||||
"timestamp_col timestamp) partitioned by (year int, month int) stored as parquet"
|
||||
" {2}".format(source_db, partitioned_tbl, TBLPROPERTIES))
|
||||
|
||||
# case I: insert
|
||||
# load the table with some data from impala, this also creates new partitions.
|
||||
impala_client.execute("insert into {0}.{1}"
|
||||
" select * from functional.tinytable".format(source_db,
|
||||
unpartitioned_tbl))
|
||||
impala_client.execute("insert into {0}.{1} partition(year,month)"
|
||||
" select * from functional_parquet.alltypessmall".format(
|
||||
source_db, partitioned_tbl))
|
||||
rows_in_unpart_tbl = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(source_db, unpartitioned_tbl)).split('\t')[
|
||||
0])
|
||||
rows_in_part_tbl = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(source_db, partitioned_tbl))
|
||||
.split('\t')[0])
|
||||
assert rows_in_unpart_tbl > 0
|
||||
assert rows_in_part_tbl > 0
|
||||
# bootstrap the replication
|
||||
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# create a target database where tables will be replicated
|
||||
impala_client.execute("create database {0}".format(target_db))
|
||||
# replicate the table from source to target
|
||||
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
||||
target_db))
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
assert unpartitioned_tbl in impala_client.execute(
|
||||
"show tables in {0}".format(target_db)).get_data()
|
||||
assert partitioned_tbl in impala_client.execute(
|
||||
"show tables in {0}".format(target_db)).get_data()
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
||||
.split('\t')[0])
|
||||
assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
||||
assert rows_in_part_tbl == rows_in_part_tbl_target
|
||||
|
||||
# case II: insert into existing partitions.
|
||||
impala_client.execute("insert into {0}.{1}"
|
||||
" select * from functional.tinytable".format(
|
||||
source_db, unpartitioned_tbl))
|
||||
impala_client.execute("insert into {0}.{1} partition(year,month)"
|
||||
" select * from functional_parquet.alltypessmall".format(
|
||||
source_db, partitioned_tbl))
|
||||
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
||||
target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
||||
.split('\t')[0])
|
||||
assert 2 * rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
||||
assert 2 * rows_in_part_tbl == rows_in_part_tbl_target
|
||||
|
||||
# Case III: insert overwrite
|
||||
# impala does a insert overwrite of the tables.
|
||||
impala_client.execute("insert overwrite table {0}.{1}"
|
||||
" select * from functional.tinytable".format(
|
||||
source_db, unpartitioned_tbl))
|
||||
impala_client.execute("insert overwrite table {0}.{1} partition(year,month)"
|
||||
" select * from functional_parquet.alltypessmall".format(
|
||||
source_db, partitioned_tbl))
|
||||
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
||||
target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
||||
.split('\t')[0])
|
||||
assert rows_in_unpart_tbl == rows_in_unpart_tbl_target
|
||||
assert rows_in_part_tbl == rows_in_part_tbl_target
|
||||
|
||||
# Case IV: CTAS which creates a transactional table.
|
||||
impala_client.execute(
|
||||
"create table {0}.insertonly_nopart_ctas {1} as "
|
||||
"select * from {0}.{2}".format(source_db, TBLPROPERTIES, unpartitioned_tbl))
|
||||
impala_client.execute(
|
||||
"create table {0}.insertonly_part_ctas partitioned by (year, month) {1}"
|
||||
" as select * from {0}.{2}".format(source_db, TBLPROPERTIES, partitioned_tbl))
|
||||
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
||||
target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_source = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from "
|
||||
"{0}.insertonly_nopart_ctas".format(source_db)).split('\t')[0])
|
||||
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from "
|
||||
"{0}.insertonly_nopart_ctas".format(target_db)).split('\t')[0])
|
||||
assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
|
||||
rows_in_unpart_tbl_source = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from "
|
||||
"{0}.insertonly_part_ctas".format(source_db)).split('\t')[0])
|
||||
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from "
|
||||
"{0}.insertonly_part_ctas".format(target_db)).split('\t')[0])
|
||||
assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target
|
||||
|
||||
# Case V: truncate table
|
||||
# impala truncates both the tables. Make sure replication sees that.
|
||||
impala_client.execute("truncate table {0}.{1}".format(source_db,
|
||||
unpartitioned_tbl))
|
||||
impala_client.execute("truncate table {0}.{1}".format(source_db, partitioned_tbl))
|
||||
cls.run_stmt_in_hive("repl dump {0}".format(source_db))
|
||||
# replicate the table from source to target
|
||||
cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db,
|
||||
target_db))
|
||||
# we wait until the events catch up in case repl command above did some HMS
|
||||
# operations.
|
||||
EventProcessorUtils.wait_for_event_processing_impl(hive_client, impala_cluster)
|
||||
# confirm the number of rows in target match with the source table.
|
||||
rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl))
|
||||
.split('\t')[0])
|
||||
rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client,
|
||||
"select count(*) from {0}.{1}".format(target_db, partitioned_tbl))
|
||||
.split('\t')[0])
|
||||
assert rows_in_unpart_tbl_target == 0
|
||||
assert rows_in_part_tbl_target == 0
|
||||
finally:
|
||||
src_db = cls.__get_db_nothrow(source_db)
|
||||
target_db_obj = cls.__get_db_nothrow(target_db)
|
||||
if src_db is not None:
|
||||
cls.run_stmt_in_hive(
|
||||
"alter database {0} set dbproperties ('repl.source.for'='')".format(source_db))
|
||||
cls.run_stmt_in_hive("drop database if exists {0} cascade"
|
||||
.format(source_db))
|
||||
if target_db_obj is not None:
|
||||
cls.run_stmt_in_hive("drop database if exists {0} cascade"
|
||||
.format(target_db))
|
||||
# workaround for HIVE-24135. the managed db location doesn't get cleaned up
|
||||
if src_db is not None and src_db.managedLocationUri is not None:
|
||||
filesystem_client.delete_file_dir(src_db.managedLocationUri,
|
||||
True)
|
||||
if target_db_obj is not None and target_db_obj.managedLocationUri is not None:
|
||||
filesystem_client.delete_file_dir(
|
||||
target_db_obj.managedLocationUri, True)
|
||||
|
||||
@classmethod
|
||||
def __get_db_nothrow(self, name):
|
||||
try:
|
||||
return self.hive_client.get_database(name)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def _get_transactional_tblproperties(self, is_transactional):
|
||||
"""
|
||||
Util method to generate the tblproperties for transactional tables
|
||||
"""
|
||||
tblproperties = ""
|
||||
if is_transactional:
|
||||
tblproperties = "tblproperties ('transactional'='true'," \
|
||||
"'transactional_properties'='insert_only')"
|
||||
return tblproperties
|
||||
@@ -16,7 +16,7 @@
|
||||
# under the License.
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
from test_ddl_base import TestDdlBase
|
||||
from tests.metadata.test_ddl_base import TestDdlBase
|
||||
|
||||
|
||||
class TestResetMetadata(TestDdlBase):
|
||||
|
||||
@@ -82,19 +82,24 @@ class EventProcessorUtils(object):
|
||||
|
||||
@staticmethod
|
||||
def wait_for_event_processing(test_suite, timeout=10):
|
||||
if isinstance(test_suite, CustomClusterTestSuite):
|
||||
impala_cluster = test_suite.cluster
|
||||
else:
|
||||
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
|
||||
EventProcessorUtils.wait_for_event_processing_impl(test_suite.hive_client,
|
||||
impala_cluster, timeout)
|
||||
|
||||
@staticmethod
|
||||
def wait_for_event_processing_impl(hive_client, impala_cluster, timeout=10):
|
||||
"""Waits till the event processor has synced to the latest event id from metastore
|
||||
or the timeout value in seconds whichever is earlier"""
|
||||
if EventProcessorUtils.get_event_processor_status() == "DISABLED":
|
||||
return
|
||||
assert timeout > 0
|
||||
assert test_suite.hive_client is not None
|
||||
assert hive_client is not None
|
||||
current_event_id = EventProcessorUtils.get_current_notification_id(
|
||||
test_suite.hive_client)
|
||||
hive_client)
|
||||
EventProcessorUtils.wait_for_synced_event_id(timeout, current_event_id)
|
||||
if isinstance(test_suite, CustomClusterTestSuite):
|
||||
impala_cluster = test_suite.cluster
|
||||
else:
|
||||
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
|
||||
# Wait until the impalad catalog versions agree with the catalogd's version.
|
||||
catalogd_version = impala_cluster.catalogd.service.get_catalog_version()
|
||||
for impalad in impala_cluster.impalads:
|
||||
|
||||
Reference in New Issue
Block a user