IMPALA-12680: Fix NullPointerException during AlterTableAddPartitions

When global INVALIDATE METADATA is run at the same time while
AlterTableAddPartition statement is being run, a precondition check in
addHmsPartitions() could lead to NullPointerException. This happens
due to Map<String, Long> partitionToEventId being initialized to null
when event processor is not active.

We should always initialize 'partitionToEventId' to empty hash map
regardless of the state of event processor. If the event processor is
not active, then addHmsPartitions() adds partitions that are directly
fetched from metastore.

Note: Also, addressed the same issue that could potentially happen in
AlterTableRecoverPartitions.

Testing:
- Verified manually that NullPointerException scenario is avoided.
- Added a unit test to verify the above use case.

Change-Id: I730fed311ebc09762dccc152d9583d5394b0b9b3
Reviewed-on: http://gerrit.cloudera.org:8080/21430
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Sai Hemanth Gantasala
2024-05-14 18:40:04 -07:00
committed by stiga-huang
parent b4670a8638
commit 0140a15a04
3 changed files with 64 additions and 12 deletions

View File

@@ -145,6 +145,7 @@ import org.apache.impala.catalog.events.MetastoreEvents.DropTableEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey; import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventPropertyKey;
import org.apache.impala.catalog.events.MetastoreEventsProcessor; import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
import org.apache.impala.catalog.events.MetastoreNotificationException; import org.apache.impala.catalog.events.MetastoreNotificationException;
import org.apache.impala.catalog.monitor.CatalogMonitor; import org.apache.impala.catalog.monitor.CatalogMonitor;
import org.apache.impala.catalog.monitor.CatalogOperationTracker; import org.apache.impala.catalog.monitor.CatalogOperationTracker;
@@ -1270,7 +1271,7 @@ public class CatalogOpExecutor {
format = params.getSet_file_format_params().file_format; format = params.getSet_file_format_params().file_format;
} }
alterTableAddPartitions(tbl, params.getAdd_partition_params(), format, alterTableAddPartitions(tbl, params.getAdd_partition_params(), format,
catalogTimeline, modification); catalogTimeline, modification, debugAction);
reloadMetadata = false; reloadMetadata = false;
responseSummaryMsg = "New partition has been added to the table."; responseSummaryMsg = "New partition has been added to the table.";
break; break;
@@ -4600,8 +4601,8 @@ public class CatalogOpExecutor {
*/ */
private void alterTableAddPartitions(Table tbl, private void alterTableAddPartitions(Table tbl,
TAlterTableAddPartitionParams addPartParams, THdfsFileFormat fileFormat, TAlterTableAddPartitionParams addPartParams, THdfsFileFormat fileFormat,
EventSequence catalogTimeline, InProgressTableModification modification) EventSequence catalogTimeline, InProgressTableModification modification,
throws ImpalaException { String debugAction) throws ImpalaException {
Preconditions.checkState(tbl.isWriteLockedByCurrentThread()); Preconditions.checkState(tbl.isWriteLockedByCurrentThread());
TableName tableName = tbl.getTableName(); TableName tableName = tbl.getTableName();
@@ -4639,10 +4640,10 @@ public class CatalogOpExecutor {
List<Partition> difference = null; List<Partition> difference = null;
try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { try (MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ? Map<String, Long> partitionToEventId = Maps.newHashMap();
Maps.newHashMap() : null;
List<Partition> addedHmsPartitions = addHmsPartitionsInTransaction(msClient, List<Partition> addedHmsPartitions = addHmsPartitionsInTransaction(msClient,
tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists, catalogTimeline); tbl, allHmsPartitionsToAdd, partitionToEventId, ifNotExists, catalogTimeline,
debugAction);
// Handle HDFS cache. This is done in a separate round bacause we have to apply // Handle HDFS cache. This is done in a separate round bacause we have to apply
// caching only to newly added partitions. // caching only to newly added partitions.
alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions, alterTableCachePartitions(msTbl, msClient, tableName, addedHmsPartitions,
@@ -5299,7 +5300,7 @@ public class CatalogOpExecutor {
*/ */
private List<Partition> addHmsPartitions(MetaStoreClient msClient, private List<Partition> addHmsPartitions(MetaStoreClient msClient,
Table tbl, List<Partition> allHmsPartitionsToAdd, Table tbl, List<Partition> allHmsPartitionsToAdd,
@Nullable Map<String, Long> partitionToEventId, boolean ifNotExists, Map<String, Long> partitionToEventId, boolean ifNotExists,
EventSequence catalogTimeline) throws ImpalaRuntimeException, CatalogException { EventSequence catalogTimeline) throws ImpalaRuntimeException, CatalogException {
long eventId = getCurrentEventId(msClient, catalogTimeline); long eventId = getCurrentEventId(msClient, catalogTimeline);
List<Partition> addedHmsPartitions = Lists List<Partition> addedHmsPartitions = Lists
@@ -5332,7 +5333,6 @@ public class CatalogOpExecutor {
// add_partitions call above. // add_partitions call above.
addedHmsPartitions.addAll(addedPartitions); addedHmsPartitions.addAll(addedPartitions);
} else { } else {
Preconditions.checkNotNull(partitionToEventId);
addedHmsPartitions.addAll(partitionToEventSubMap.keySet()); addedHmsPartitions.addAll(partitionToEventSubMap.keySet());
// we cannot keep a mapping of Partition to event ids because the // we cannot keep a mapping of Partition to event ids because the
// partition objects are changed later in the cachePartitions code path. // partition objects are changed later in the cachePartitions code path.
@@ -5361,8 +5361,12 @@ public class CatalogOpExecutor {
*/ */
private List<Partition> addHmsPartitionsInTransaction(MetaStoreClient msClient, private List<Partition> addHmsPartitionsInTransaction(MetaStoreClient msClient,
Table tbl, List<Partition> partitions, Map<String, Long> partitionToEventId, Table tbl, List<Partition> partitions, Map<String, Long> partitionToEventId,
boolean ifNotExists, EventSequence catalogTimeline) throws ImpalaException { boolean ifNotExists, EventSequence catalogTimeline, String debugAction)
throws ImpalaException {
if (!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) { if (!AcidUtils.isTransactionalTable(tbl.getMetaStoreTable().getParameters())) {
if (DebugUtils.hasDebugAction(debugAction, DebugUtils.ENABLE_EVENT_PROCESSOR)) {
catalog_.startEventsProcessor();
}
return addHmsPartitions(msClient, tbl, partitions, partitionToEventId, return addHmsPartitions(msClient, tbl, partitions, partitionToEventId,
ifNotExists, catalogTimeline); ifNotExists, catalogTimeline);
} }
@@ -6302,10 +6306,12 @@ public class CatalogOpExecutor {
} }
// Add partitions to metastore. // Add partitions to metastore.
Map<String, Long> partitionToEventId = catalog_.isEventProcessingActive() ? Map<String, Long> partitionToEventId = Maps.newHashMap();
Maps.newHashMap() : null;
String annotation = String.format("Recovering %d partitions for %s", String annotation = String.format("Recovering %d partitions for %s",
hmsPartitions.size(), tbl.getFullName()); hmsPartitions.size(), tbl.getFullName());
if (DebugUtils.hasDebugAction(debugAction, DebugUtils.ENABLE_EVENT_PROCESSOR)) {
catalog_.startEventsProcessor();
}
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation); try (ThreadNameAnnotator tna = new ThreadNameAnnotator(annotation);
MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) { MetaStoreClient msClient = catalog_.getMetaStoreClient(catalogTimeline)) {
List<Partition> addedPartitions = addHmsPartitions(msClient, tbl, hmsPartitions, List<Partition> addedPartitions = addHmsPartitions(msClient, tbl, hmsPartitions,

View File

@@ -83,6 +83,9 @@ public class DebugUtils {
// debug action label for introducing delay in loading table metadata. // debug action label for introducing delay in loading table metadata.
public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay"; public static final String LOAD_TABLES_DELAY = "impalad_load_tables_delay";
// debug action to enable eventProcessor
public static final String ENABLE_EVENT_PROCESSOR = "enable_event_processor";
/** /**
* Returns true if the label of action is set in the debugActions * Returns true if the label of action is set in the debugActions
*/ */

View File

@@ -3962,6 +3962,40 @@ public class MetastoreEventsProcessorTest {
} }
} }
@Test
public void testAlterTableWithEpDisabled() throws Exception {
try {
createDatabaseFromImpala(TEST_DB_NAME, null);
String testTable = "testAlterTableNoError";
createTableFromImpala(TEST_DB_NAME, testTable, true);
eventsProcessor_.processEvents();
// set EP to paused state and execute Alter table add partition query
eventsProcessor_.pause();
long numberOfSelfEventsBefore =
eventsProcessor_.getMetrics()
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount();
TPartitionDef partitionDef = new TPartitionDef();
partitionDef.addToPartition_spec(new TPartitionKeyValue("p1", "100"));
partitionDef.addToPartition_spec(new TPartitionKeyValue("p2", "200"));
alterTableAddPartition(TEST_DB_NAME, testTable, partitionDef,
"enable_event_processor");
eventsProcessor_.processEvents();
assertEquals(EventProcessorStatus.ACTIVE, eventsProcessor_.getStatus());
long numberOfSelfEventsAfter =
eventsProcessor_.getMetrics()
.getCounter(MetastoreEventsProcessor.EVENTS_SKIPPED_METRIC).getCount();
// expect ADD_PARTITION event to be skipped as self-event
assertEquals("Unexpected self events skipped: ", numberOfSelfEventsAfter,
numberOfSelfEventsBefore + 1);
} catch (NullPointerException ex) {
throw new CatalogException("Exception occured while applying AlterTableEvent", ex);
} finally {
if (eventsProcessor_.getStatus() != EventProcessorStatus.ACTIVE) {
eventsProcessor_.start();
}
}
}
private void createDatabase(String catName, String dbName, private void createDatabase(String catName, String dbName,
Map<String, String> params) throws TException { Map<String, String> params) throws TException {
try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) { try(MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
@@ -4259,8 +4293,17 @@ public class MetastoreEventsProcessorTest {
*/ */
private void alterTableAddPartition( private void alterTableAddPartition(
String dbName, String tblName, TPartitionDef partitionDef) throws ImpalaException { String dbName, String tblName, TPartitionDef partitionDef) throws ImpalaException {
alterTableAddPartition(dbName, tblName,partitionDef, null);
}
private void alterTableAddPartition(String dbName, String tblName,
TPartitionDef partitionDef, String debugActions) throws ImpalaException {
TDdlExecRequest req = new TDdlExecRequest(); TDdlExecRequest req = new TDdlExecRequest();
req.setQuery_options(new TDdlQueryOptions()); TDdlQueryOptions queryOptions = new TDdlQueryOptions();
if (debugActions != null) {
queryOptions.setDebug_action(debugActions);
}
req.setQuery_options(queryOptions);
req.setDdl_type(TDdlType.ALTER_TABLE); req.setDdl_type(TDdlType.ALTER_TABLE);
TAlterTableParams alterTableParams = new TAlterTableParams(); TAlterTableParams alterTableParams = new TAlterTableParams();
alterTableParams.setTable_name(new TTableName(dbName, tblName)); alterTableParams.setTable_name(new TTableName(dbName, tblName));