IMPALA-13285: Ignore COMMIT_TXN events on Apache Hive 3

In Apache Hive 3, HMS doesn't provide the API to retrive WriteEvents
info of a given transaction. COMMIT_TXN event just contains a
transaction id so Impala can't process it.

This patch ignores COMMIT_TXN events when building on Apache Hive 3.
Some tests in MetastoreEventsProcessorTest and EventExecutorServiceTest
are skipped due to this.

Tests:
 - Manually tested on Apache Hive 3. Verified that EventProcessor still
   works after receiving COMMIT_TXN events.
 - Passed some tests in MetastoreEventsProcessorTest and
   EventExecutorServiceTest that previously failed due to EventProcessor
   going into ERROR state.

Change-Id: I863e39b3702028a14e83fed1fe912b441f2c28db
Reviewed-on: http://gerrit.cloudera.org:8080/23117
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2025-07-03 19:47:41 +08:00
committed by Impala Public Jenkins
parent 591bf48c72
commit dc46aa48d9
5 changed files with 45 additions and 30 deletions

View File

@@ -74,7 +74,7 @@ import org.apache.impala.catalog.Hive3MetastoreShimBase;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.MetastoreEvents.DerivedMetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.IgnoredEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreTableEvent;
import org.apache.impala.catalog.events.MetastoreEventsProcessor.MetaDataFilter;
import org.apache.impala.catalog.events.MetastoreNotificationException;
@@ -631,38 +631,17 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
/**
* CDP Hive-3 only function.
*/
public static class CommitTxnEvent extends MetastoreEvent {
public static class CommitTxnEvent extends IgnoredEvent {
public static final String EVENT_TYPE = "COMMIT_TXN";
public CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
NotificationEvent event) {
super(catalogOpExecutor, metrics, event);
throw new UnsupportedOperationException("CommitTxnEvent is not supported.");
}
@Override
protected void process() throws MetastoreNotificationException {
}
@Override
protected boolean onFailure(Exception e) {
return false;
}
@Override
protected boolean isEventProcessingDisabled() {
return false;
}
@Override
protected SelfEventContext getSelfEventContext() {
return null;
}
@Override
protected boolean shouldSkipWhenSyncingToLatestEventId() {
return false;
public void process() {
LOG.info("Ignoring COMMIT_TXN event {}", getEventId());
}
}
@@ -1045,6 +1024,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
*/
public static List<PseudoCommitTxnEvent> getPseudoCommitTxnEvents(
CommitTxnEvent event) {
throw new UnsupportedOperationException("CommitTxnEvent is not supported.");
LOG.info("Ignoring COMMIT_TXN event {}", event.getEventId());
return Collections.emptyList();
}
}

View File

@@ -3606,10 +3606,7 @@ public class MetastoreEvents {
*/
public static class IgnoredEvent extends MetastoreEvent {
/**
* Prevent instantiation from outside should use MetastoreEventFactory instead
*/
private IgnoredEvent(
public IgnoredEvent(
CatalogOpExecutor catalogOpExecutor, Metrics metrics, NotificationEvent event) {
super(catalogOpExecutor, metrics, event);
}

View File

@@ -53,9 +53,11 @@ import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.testutil.CatalogServiceTestCatalog;
import org.apache.impala.testutil.TestUtils;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -643,6 +645,9 @@ public class EventExecutorServiceTest {
*/
@Test
public void testCommitTxn() throws Exception {
Assume.assumeFalse("Skipping this since COMMIT_TXN events are ignored on Apache " +
"Hive 2/3. So the validWriteIds list is not updated correctly.",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
EventExecutorService eventExecutorService = createEventExecutorService(2, 2);
transactionTest(false);
shutDownEventExecutorService(eventExecutorService);
@@ -654,6 +659,9 @@ public class EventExecutorServiceTest {
*/
@Test
public void testAbortTxn() throws Exception {
Assume.assumeFalse("Skipping this since COMMIT_TXN events are ignored on Apache " +
"Hive 2/3. So the validWriteIds list is not updated correctly.",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
EventExecutorService eventExecutorService = createEventExecutorService(2, 2);
transactionTest(true);
shutDownEventExecutorService(eventExecutorService);

View File

@@ -2859,6 +2859,8 @@ public class MetastoreEventsProcessorTest {
@Test
public void testCommitEvent() throws TException, ImpalaException, IOException {
Assume.assumeFalse("Skipping this since it depends on the behavior of CDP Hive 3",
TestUtils.isApacheHiveVersion());
// Turn on incremental refresh for transactional table
final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg();
try {
@@ -2879,6 +2881,8 @@ public class MetastoreEventsProcessorTest {
@Test
public void testAbortEvent() throws TException, ImpalaException, IOException {
Assume.assumeFalse("COMMIT_TXN events are not processed on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
// Turn on incremental refresh for transactional table
final TBackendGflags origCfg = BackendConfig.INSTANCE.getBackendCfg();
try {
@@ -3632,6 +3636,8 @@ public class MetastoreEventsProcessorTest {
*/
@Test
public void testSkipFetchOpenTransactionEvent() throws Exception {
Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
long currentEventId = eventsProcessor_.getCurrentEventId();
try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
// 1. Fetch notification events after open and commit transaction
@@ -3678,6 +3684,8 @@ public class MetastoreEventsProcessorTest {
*/
@Test
public void testFetchEventsInBatchWithOpenTxnAsLastEvent() throws Exception {
Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
long currentEventId = eventsProcessor_.getCurrentEventId();
try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
long txnId = MetastoreShim.openTransaction(client.getHiveClient());
@@ -3694,6 +3702,8 @@ public class MetastoreEventsProcessorTest {
@Test
public void testNotFetchingUnwantedEvents() throws Exception {
Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
String tblName = "test_event_skip_list";
createDatabase(TEST_DB_NAME, null);
Map<String, String> params = new HashMap<>();
@@ -3840,6 +3850,8 @@ public class MetastoreEventsProcessorTest {
@Test
public void testReloadEventOnLoadedTable() throws Exception {
Assume.assumeFalse("RELOAD event is not generated on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
String tblName = "test_reload";
createDatabase(TEST_DB_NAME, null);
eventsProcessor_.processEvents();
@@ -3885,6 +3897,9 @@ public class MetastoreEventsProcessorTest {
@Test
public void testCommitCompactionEventOnLoadedTable() throws Exception {
Assume.assumeFalse("Skipping this since COMMIT_TXN event is not supported on " +
"Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
String tblName = "test_commit_compaction";
createDatabase(TEST_DB_NAME, null);
eventsProcessor_.processEvents();
@@ -3984,6 +3999,8 @@ public class MetastoreEventsProcessorTest {
*/
@Test
public void testEmptyPartitionValues() throws Exception {
Assume.assumeFalse("RELOAD event is not generated on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
String prevFlag = BackendConfig.INSTANCE.debugActions();
try {
String tblName = "test_empty";
@@ -4080,6 +4097,8 @@ public class MetastoreEventsProcessorTest {
public void testAllocWriteIdEvent(String tblName, boolean isPartitioned,
boolean isLoadTable) throws TException, TransactionException, CatalogException {
Assume.assumeFalse("COMMIT_TXN events are not processed on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
createDatabase(TEST_DB_NAME, null);
eventsProcessor_.processEvents();
createTransactionalTable(TEST_DB_NAME, tblName, isPartitioned);
@@ -4122,6 +4141,8 @@ public class MetastoreEventsProcessorTest {
@Test
public void testNotificationEventRequest() throws Exception {
Assume.assumeFalse("EventTypeSkipList is not supported on Apache Hive 2/3",
TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3);
long currentEventId = eventsProcessor_.getCurrentEventId();
// Generate some DB only related events
createDatabaseFromImpala(TEST_DB_NAME, null);
@@ -4239,6 +4260,8 @@ public class MetastoreEventsProcessorTest {
@Test
public void testCommitTxnEventTargetName() throws Exception {
Assume.assumeFalse("Skipping this since it depends on the behavior of CDP Hive 3",
TestUtils.isApacheHiveVersion());
String tblName = "test_commit_txn";
String partTblName = "test_commit_txn_part";
String insertNonPartTbl =

View File

@@ -447,6 +447,13 @@ public class TestUtils {
return Integer.parseInt(hiveMajorVersion);
}
/**
* Returns whether we are using Apache Hive versions.
*/
public static boolean isApacheHiveVersion() {
return Boolean.parseBoolean(System.getenv("USE_APACHE_HIVE"));
}
/**
* Gets checks if the catalog server running on the given host and port has
* catalog-v2 enabled