From b37f4509fa03359be77bd7966e40cb2ffd1ec3e4 Mon Sep 17 00:00:00 2001 From: stiga-huang Date: Thu, 22 May 2025 18:33:29 +0800 Subject: [PATCH] IMPALA-14089: Support REFRESH on multiple partitions Currently we just support REFRESH on the whole table or a specific partition: REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])] If users want to refresh multiple partitions, they have to submit multiple statements each for a single partition. This has some drawbacks: - It requires holding the table write lock inside catalogd multiple times, which increase lock contention with other read/write operations on the same table, e.g. getPartialCatalogObject requests from coordinators. - Catalog version of the table will be increased multiple times. Coordinators in local catalog mode is more likely to see different versions between their getPartialCatalogObject requests so have to retry planning to resolve InconsistentMetadataFetchException. - Partitions are reloaded in sequence. They should be reloaded in parallel like we do in refreshing the whole table. This patch extends the syntax to refresh multiple partitions in one statement: REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...]) [PARTITION (key_col1=val3 [, key_col2=val4...])...]] Example: REFRESH foo PARTITION(p=0) PARTITION(p=1) PARTITION(p=2); TResetMetadataRequest is extended to have a list of partition specs for this. If the list has only one item, we still use the existing logic of reloading a specific partition. If the list has more than one item, partitions will be reloaded in parallel. This is implemented in CatalogServiceCatalog#reloadTable(). Previously it always invokes HdfsTable#load() with partitionsToUpdate=null. Now the parameter is set when TResetMetadataRequest has the partition list. HMS notification events in RELOAD type will be fired for each partition if enable_reload_events is turned on. Once HIVE-28967 is resolved, we can fire a single event for multiple partitions. Updated docs in impala_refresh.xml. Tests: - Added FE and e2e tests Change-Id: Ie5b0deeaf23129ed6e1ba2817f54291d7f63d04e Reviewed-on: http://gerrit.cloudera.org:8080/22938 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- common/thrift/CatalogService.thrift | 9 ++- docs/impala_keydefs.ditamap | 1 + docs/topics/impala_refresh.xml | 16 ++++- .../apache/impala/compat/MetastoreShim.java | 2 +- .../apache/impala/compat/MetastoreShim.java | 37 +++++++--- fe/src/main/cup/sql-parser.cup | 20 +++++- .../impala/analysis/ResetMetadataStmt.java | 43 +++++++----- .../impala/catalog/CatalogServiceCatalog.java | 26 ++++--- .../org/apache/impala/catalog/HdfsTable.java | 21 ++++-- .../impala/service/CatalogOpExecutor.java | 55 +++++++++------ .../org/apache/impala/util/CatalogOpUtil.java | 4 +- .../apache/impala/analysis/AnalyzerTest.java | 16 ++++- .../apache/impala/analysis/ParserTest.java | 4 ++ .../analysis/StmtMetadataLoaderTest.java | 5 +- .../org/apache/impala/analysis/ToSqlTest.java | 2 + .../events/MetastoreEventsProcessorTest.java | 2 +- .../apache/impala/service/FrontendTest.java | 2 + .../apache/impala/util/CatalogOpUtilTest.java | 4 ++ .../test_events_custom_configs.py | 17 +++-- tests/metadata/test_refresh_partition.py | 67 +++++++++++++++++-- 20 files changed, 268 insertions(+), 85 deletions(-) diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 261fb7365..51379682f 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -325,8 +325,9 @@ struct TResetMetadataRequest { // the entire catalog 4: optional CatalogObjects.TTableName table_name - // If set, refreshes the specified partition, otherwise - // refreshes the whole table + // Deprecated - use partition_spec_list instead. Keeps this for compatibility. + // If set, refreshes the specified partition. + // Refreshes the whole table if both this and partition_spec_list are not set. 5: optional list partition_spec // If set, refreshes functions in the specified database. @@ -344,6 +345,10 @@ struct TResetMetadataRequest { // debug_action is set from the query_option when available. 10: optional string debug_action + + // If set, refreshes the specified list of partitions + // Refreshes the whole table if both this and partition_spec are not set. + 11: optional list> partition_spec_list } // Response from TResetMetadataRequest diff --git a/docs/impala_keydefs.ditamap b/docs/impala_keydefs.ditamap index af4c41682..46ec6b856 100644 --- a/docs/impala_keydefs.ditamap +++ b/docs/impala_keydefs.ditamap @@ -10604,6 +10604,7 @@ under the License. Impala 1.3.2 Impala 1.3.0 + Impala 5.0 Impala 4.2 Impala 3.4 Impala 3.3 diff --git a/docs/topics/impala_refresh.xml b/docs/topics/impala_refresh.xml index 4e43b1e2d..dcc959044 100644 --- a/docs/topics/impala_refresh.xml +++ b/docs/topics/impala_refresh.xml @@ -67,7 +67,9 @@ under the License.

-REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])] +REFRESH [db_name.]table_name +[PARTITION (key_col1=val1 [, key_col2=val2...]) + [PARTITION (key_col1=val3 [, key_col2=val4...])...]

@@ -115,7 +117,7 @@ under the License.

- Refreshing a single partition: + Refreshing specific partitions:

@@ -125,6 +127,13 @@ under the License. values for each of the partition key columns.

+

+ In and higher, the REFRESH statement + can apply to multiple partitions at a time, rather than a single partition. Use the + optional PARTITION (partition_spec) clause for each + each of the partition. +

+

The following rules apply:

    @@ -164,6 +173,9 @@ refresh p2 partition (z=1, y=0) -- Incomplete partition spec causes an error. refresh p2 partition (y=0) ERROR: AnalysisException: Items in partition spec must exactly match the partition columns in the table definition: default.p2 (1 vs 2) + +-- Refresh multiple partitions. +refresh p2 partition (y=0, z=3) partition (y=1, z=0) partition (y=1, z=2); ]]> diff --git a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java index e6ac112e7..ac0a0657c 100644 --- a/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-apache-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -435,7 +435,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase { */ @VisibleForTesting public static List fireReloadEventHelper(MetaStoreClient msClient, - boolean isRefresh, List partVals, String dbName, String tableName, + boolean isRefresh, List> partVals, String dbName, String tableName, Map selfEventParams) throws TException { throw new UnsupportedOperationException("Reload event is not supported."); } diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 78a0e4f85..f2c71fab8 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -524,15 +524,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase { * @param msClient Metastore client, * @param isRefresh if this flag is set to true then it is a refresh query, else it * is an invalidate metadata query. - * @param partVals The partition list corresponding to - * the table, used by Apache Hive 3 + * @param partValsList partition values (List) for each partition * @param dbName * @param tableName * @return a list of eventIds for the reload events */ @VisibleForTesting public static List fireReloadEventHelper(MetaStoreClient msClient, - boolean isRefresh, List partVals, String dbName, String tableName, + boolean isRefresh, List> partValsList, String dbName, String tableName, Map selfEventParams) throws TException { Preconditions.checkNotNull(msClient); Preconditions.checkNotNull(dbName); @@ -542,16 +541,32 @@ public class MetastoreShim extends Hive3MetastoreShimBase { FireEventRequest rqst = new FireEventRequest(true, data); rqst.setDbName(dbName); rqst.setTableName(tableName); - rqst.setPartitionVals(partVals); rqst.setTblParams(selfEventParams); - FireEventResponse response = msClient.getHiveClient().fireListenerEvent(rqst); - if (!response.isSetEventIds()) { - LOG.error("FireEventResponse does not have event ids set for table {}.{}. This " - + "may cause the table to unnecessarily be refreshed when the " + - "refresh/invalidate event is received.", dbName, tableName); - return Collections.emptyList(); + if (partValsList == null || partValsList.isEmpty()) { + FireEventResponse response = msClient.getHiveClient().fireListenerEvent(rqst); + if (!response.isSetEventIds()) { + LOG.error("FireEventResponse does not have event ids set for table {}.{}. This " + + "may cause the table to unnecessarily be refreshed when the " + + "refresh/invalidate event is received.", dbName, tableName); + return Collections.emptyList(); + } + return response.getEventIds(); } - return response.getEventIds(); + List eventIds = new ArrayList<>(); + // TODO: Fire one event once HIVE-28967 is resolved. + for (List partVals : partValsList) { + rqst.setPartitionVals(partVals); + FireEventResponse response = msClient.getHiveClient().fireListenerEvent(rqst); + if (!response.isSetEventIds()) { + LOG.error("FireEventResponse does not have event ids set for table {}.{} " + + "partition {}. This may cause the table to unnecessarily be refreshed " + + "when the refresh/invalidate event is received.", + dbName, tableName, partVals); + continue; + } + eventIds.addAll(response.getEventIds()); + } + return eventIds; } /** diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index 553a1a599..16c197b6b 100755 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -504,6 +504,8 @@ nonterminal PartitionSpec partition_spec; nonterminal PartitionSet opt_partition_set; // Required partition set nonterminal PartitionSet partition_set; +// Optional partition spec list +nonterminal List partition_spec_list; nonterminal List partition_clause; nonterminal List static_partition_key_value_list; nonterminal List partition_key_value_list; @@ -845,8 +847,8 @@ reset_metadata_stmt ::= {: RESULT = ResetMetadataStmt.createInvalidateStmt(table); :} | KW_REFRESH table_name:table {: RESULT = ResetMetadataStmt.createRefreshTableStmt(table); :} - | KW_REFRESH table_name:table partition_spec:partition - {: RESULT = ResetMetadataStmt.createRefreshPartitionStmt(table, partition); :} + | KW_REFRESH table_name:table partition_spec_list:partitions + {: RESULT = ResetMetadataStmt.createRefreshPartitionsStmt(table, partitions); :} | KW_REFRESH KW_FUNCTIONS ident_or_unreserved:db {: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :} | KW_REFRESH KW_AUTHORIZATION @@ -2774,6 +2776,20 @@ opt_partition_spec ::= {: RESULT = null; :} ; +partition_spec_list ::= + partition_spec:item + {: + List list = new ArrayList<>(); + list.add(item); + RESULT = list; + :} + | partition_spec_list:list partition_spec:item + {: + list.add(item); + RESULT = list; + :} + ; + static_partition_key_value_list ::= static_partition_key_value:item {: diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java index d980ccba7..a75693f89 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java @@ -68,8 +68,8 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt // database functions. private TableName tableName_; - // not null when refreshing a single partition - private final PartitionSpec partitionSpec_; + // not null when refreshing specified partitions + private final List partitionSpecList_; // not null when refreshing functions in a database. private final String database_; @@ -87,13 +87,15 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt private String clientIp_; private ResetMetadataStmt(Action action, String db, TableName tableName, - PartitionSpec partitionSpec) { + List partitionSpecList) { Preconditions.checkNotNull(action); action_ = action; database_ = db; tableName_ = tableName; - partitionSpec_ = partitionSpec; - if (partitionSpec_ != null) partitionSpec_.setTableName(tableName_); + partitionSpecList_ = partitionSpecList; + if (partitionSpecList_ != null) { + partitionSpecList_.forEach(p -> p.setTableName(tableName_)); + } } public static ResetMetadataStmt createInvalidateStmt() { @@ -111,10 +113,11 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt Preconditions.checkNotNull(tableName), /*partition*/ null); } - public static ResetMetadataStmt createRefreshPartitionStmt(TableName tableName, - PartitionSpec partitionSpec) { + public static ResetMetadataStmt createRefreshPartitionsStmt(TableName tableName, + List partitionSpecList) { return new ResetMetadataStmt(Action.REFRESH_PARTITION, /*db*/ null, - Preconditions.checkNotNull(tableName), Preconditions.checkNotNull(partitionSpec)); + Preconditions.checkNotNull(tableName), + Preconditions.checkNotNull(partitionSpecList)); } public static ResetMetadataStmt createRefreshFunctionsStmt(String db) { @@ -132,8 +135,6 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt @Override public TableName getTableName() { return tableName_; } - public PartitionSpec getPartitionSpec() { return partitionSpec_; } - @VisibleForTesting protected Action getAction() { return action_; } @@ -144,7 +145,7 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt @Override public void collectTableRefs(List tblRefs) { - if (tableName_ != null && partitionSpec_ != null) { + if (tableName_ != null && partitionSpecList_ != null) { tblRefs.add(new TableRef(tableName_.toPath(), null)); } } @@ -172,20 +173,22 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt throw new AnalysisException(Analyzer.TBL_DOES_NOT_EXIST_ERROR_MSG + tableName_); } - if (partitionSpec_ != null) { + if (partitionSpecList_ != null) { try { // Get local table info without reaching out to HMS FeTable table = analyzer.getTable(dbName, tableName_.getTbl(), /* must_exist */ true); if (AcidUtils.isTransactionalTable(table)) { - throw new AnalysisException("Refreshing a partition is not allowed on " + + throw new AnalysisException("Refreshing partitions is not allowed on " + "transactional tables. Try to refresh the whole table instead."); } } catch (TableLoadingException e) { throw new AnalysisException(e); } - partitionSpec_.setPrivilegeRequirement(Privilege.ANY); - partitionSpec_.analyze(analyzer); + for (PartitionSpec ps : partitionSpecList_) { + ps.setPrivilegeRequirement(Privilege.ANY); + ps.analyze(analyzer); + } } } else { FeTable tbl = analyzer.getTableNoThrow(dbName, tableName_.getTbl()); @@ -244,8 +247,8 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt result.append("REFRESH ").append(tableName_.toSql()); break; case REFRESH_PARTITION: - result.append("REFRESH ").append(tableName_.toSql()).append(" ") - .append(partitionSpec_.toSql(options)); + result.append("REFRESH ").append(tableName_.toSql()); + partitionSpecList_.forEach(ps -> result.append(" ").append(ps.toSql(options))); break; case INVALIDATE_METADATA_ALL: result.append("INVALIDATE METADATA"); @@ -270,7 +273,11 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt if (tableName_ != null) { params.setTable_name(new TTableName(tableName_.getDb(), tableName_.getTbl())); } - if (partitionSpec_ != null) params.setPartition_spec(partitionSpec_.toThrift()); + if (partitionSpecList_ != null) { + for (PartitionSpec ps : partitionSpecList_) { + params.addToPartition_spec_list(ps.toThrift()); + } + } if (database_ != null) params.setDb_name(database_); if (action_ == Action.REFRESH_AUTHORIZATION) { params.setAuthorization(true); diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java index 0e40679dc..dfeadefd2 100644 --- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java +++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java @@ -2863,7 +2863,7 @@ public class CatalogServiceCatalog extends Catalog { CatalogObject.ThriftObjectType resultType, String reason, long eventId, boolean isSkipFileMetadataReload, EventSequence catalogTimeline) throws CatalogException { - LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName())); + LOG.info("Refreshing table metadata: {}", tbl.getFullName()); Preconditions.checkState(!(tbl instanceof IncompleteTable)); String dbName = tbl.getDb().getName(); String tblName = tbl.getName(); @@ -2905,10 +2905,20 @@ public class CatalogServiceCatalog extends Catalog { dbName + "." + tblName, e); } if (tbl instanceof HdfsTable) { + Set partitionsToUpdate = null; + if (request.getPartition_spec_listSize() > 0) { + // Convert to partition names and deduplicate partition specs. + Map> partName2PartSpec = new HashMap<>(); + for (List partSpec : request.getPartition_spec_list()) { + partName2PartSpec.put(HdfsTable.constructPartitionName(partSpec), partSpec); + } + partitionsToUpdate = partName2PartSpec.keySet(); + request.setPartition_spec_list(new ArrayList<>(partName2PartSpec.values())); + } ((HdfsTable) tbl) .load(true, msClient.getHiveClient(), msTbl, !isSkipFileMetadataReload, /* loadTableSchema*/true, request.refresh_updated_hms_partitions, - /* partitionsToUpdate*/null, request.debug_action, + partitionsToUpdate, request.debug_action, /*partitionToEventId*/null, reason, catalogTimeline); } else { tbl.load(true, msClient.getHiveClient(), msTbl, reason, catalogTimeline); @@ -3505,8 +3515,8 @@ public class CatalogServiceCatalog extends Catalog { String reason, long newCatalogVersion, @Nullable HdfsPartition hdfsPartition, EventSequence catalogTimeline) throws CatalogException { Preconditions.checkState(hdfsTable.isWriteLockedByCurrentThread()); - LOG.info(String.format("Refreshing partition metadata: %s %s (%s)", - hdfsTable.getFullName(), partitionName, reason)); + LOG.info("Refreshing partition metadata: {} {} ({})", + hdfsTable.getFullName(), partitionName, reason); try (MetaStoreClient msClient = getMetaStoreClient(catalogTimeline)) { org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null; try { @@ -3522,9 +3532,9 @@ public class CatalogServiceCatalog extends Catalog { // non-existing partition was dropped from catalog, so we mark it as refreshed wasPartitionReloaded.setRef(true); } else { - LOG.info(String.format("Partition metadata for %s was not refreshed since " + LOG.info("Partition metadata for {} {} was not refreshed since " + "it does not exist in metastore anymore", - hdfsTable.getFullName() + " " + partitionName)); + hdfsTable.getFullName(), partitionName); } return hdfsTable.toTCatalogObject(resultType); } catch (Exception e) { @@ -3540,8 +3550,8 @@ public class CatalogServiceCatalog extends Catalog { } hdfsTable.setCatalogVersion(newCatalogVersion); wasPartitionReloaded.setRef(true); - LOG.info(String.format("Refreshed partition metadata: %s %s", - hdfsTable.getFullName(), partitionName)); + LOG.info("Refreshed partition metadata: {} {}", hdfsTable.getFullName(), + partitionName); return hdfsTable.toTCatalogObject(resultType); } diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index d1618f162..06bbcb3ca 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -1498,7 +1498,11 @@ public class HdfsTable extends Table implements FeFsTable { * hive metastore. * @return true if partition does not exist in metastore, else false. */ - public abstract boolean isRemoved(HdfsPartition hdfsPartition); + public boolean isRemoved(HdfsPartition hdfsPartition) { + return isRemoved(hdfsPartition.getPartitionName()); + } + + public abstract boolean isRemoved(String partName); /** * Loads any partitions which are known to metastore but not provided in @@ -1566,9 +1570,14 @@ public class HdfsTable extends Table implements FeFsTable { loadTimeForFileMdNs_ += loadNewPartitions(partitionNames, addedPartitions); // If a list of modified partitions (old and new) is specified, don't reload file // metadata for the new ones as they have already been detected in HMS and have been - // reloaded by loadNewPartitions(). + // reloaded by loadNewPartitions(). Also ignore partitions that don't exist in HMS. if (partitionsToUpdate_ != null) { partitionsToUpdate_.removeAll(addedPartitions); + int orgSize = partitionsToUpdate_.size(); + if (partitionsToUpdate_.removeIf(this::isRemoved)) { + LOG.info("Ignored {} non-existing partitions of table {}", + orgSize - partitionsToUpdate_.size(), getFullName()); + } } // Load file metadata. Until we have a notification mechanism for when a // file changes in hdfs, it is sometimes required to reload all the file @@ -1638,8 +1647,8 @@ public class HdfsTable extends Table implements FeFsTable { } @Override - public boolean isRemoved(HdfsPartition hdfsPartition) { - return !msPartitions_.containsKey(hdfsPartition.getPartitionName()); + public boolean isRemoved(String partName) { + return !msPartitions_.containsKey(partName); } /** @@ -1732,8 +1741,8 @@ public class HdfsTable extends Table implements FeFsTable { } @Override - public boolean isRemoved(HdfsPartition hdfsPartition) { - return !partitionNamesFromHms_.contains(hdfsPartition.getPartitionName()); + public boolean isRemoved(String partName) { + return !partitionNamesFromHms_.contains(partName); } @Override diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index ed5c1e8cb..c9a986cb0 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -7158,12 +7158,14 @@ public class CatalogOpExecutor { CatalogObject.ThriftObjectType.INVALIDATION : CatalogObject.ThriftObjectType.FULL; if (isTableLoadedInCatalog) { - if (req.isSetPartition_spec()) { + if (req.isSetPartition_spec() || req.getPartition_spec_listSize() == 1) { Preconditions.checkArgument(!AcidUtils.isTransactionalTable(tbl)); + List partitionSpec = req.isSetPartition_spec() ? + req.getPartition_spec() : req.getPartition_spec_list().get(0); Reference wasPartitionRefreshed = new Reference<>(false); // TODO if the partition was not really refreshed because the partSpec // was wrong, do we still need to send back the table? - updatedThriftTable = catalog_.reloadPartition(tbl, req.getPartition_spec(), + updatedThriftTable = catalog_.reloadPartition(tbl, partitionSpec, wasPartitionRefreshed, resultType, cmdString, catalogTimeline); } else { // TODO IMPALA-8809: Optimisation for partitioned tables: @@ -7264,15 +7266,26 @@ public class CatalogOpExecutor { */ private void fireReloadEventAndUpdateRefreshEventId( TResetMetadataRequest req, TableName tblName, Table tbl) { - List partVals = null; + // Partition spec (List) for each partition + List> partSpecList = null; + // Partition values (List) for each partition + List> partValsList = null; if (req.isSetPartition_spec()) { - partVals = req.getPartition_spec().stream(). - map(TPartitionKeyValue::getValue).collect(Collectors.toList()); + partSpecList = Collections.singletonList(req.partition_spec); + } else if (req.isSetPartition_spec_list()) { + partSpecList = req.partition_spec_list; + } + if (partSpecList != null) { + partValsList = partSpecList.stream() + .map(ps -> ps.stream() + .map(TPartitionKeyValue::getValue) + .collect(Collectors.toList())) + .collect(Collectors.toList()); } try { List eventIds = MetastoreShim.fireReloadEventHelper( - catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, tblName.getDb(), - tblName.getTbl(), Collections.emptyMap()); + catalog_.getMetaStoreClient(), req.isIs_refresh(), partValsList, + tblName.getDb(), tblName.getTbl(), Collections.emptyMap()); LOG.info("Fired {} RELOAD events for table {}: {}", eventIds.size(), tbl.getFullName(), StringUtils.join(",", eventIds)); // Update the lastRefreshEventId accordingly @@ -7283,19 +7296,21 @@ public class CatalogOpExecutor { tbl.getFullName()); return; } - if (req.isSetPartition_spec()) { - HdfsTable hdfsTbl = (HdfsTable) tbl; - HdfsPartition partition = hdfsTbl - .getPartitionFromThriftPartitionSpec(req.getPartition_spec()); - if (partition != null) { - HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); - partBuilder.setLastRefreshEventId(eventIds.get(0)); - hdfsTbl.updatePartition(partBuilder); - } else { - LOG.warn("Partition {} no longer exists in table {}. It might be " + - "dropped by a concurrent operation.", - FeCatalogUtils.getPartitionName(hdfsTbl, partVals), - hdfsTbl.getFullName()); + if (partSpecList != null) { + for (int i = 0; i < partSpecList.size(); ++i) { + HdfsTable hdfsTbl = (HdfsTable) tbl; + HdfsPartition partition = hdfsTbl + .getPartitionFromThriftPartitionSpec(partSpecList.get(i)); + if (partition != null) { + HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition); + partBuilder.setLastRefreshEventId(eventIds.get(0)); + hdfsTbl.updatePartition(partBuilder); + } else { + LOG.warn("Partition {} no longer exists in table {}. It might be " + + "dropped by a concurrent operation.", + FeCatalogUtils.getPartitionName(hdfsTbl, partValsList.get(i)), + hdfsTbl.getFullName()); + } } } else { tbl.setLastRefreshEventId(eventIds.get(0)); diff --git a/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java index be7ee3e5e..c62005d45 100644 --- a/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java +++ b/fe/src/main/java/org/apache/impala/util/CatalogOpUtil.java @@ -139,7 +139,9 @@ public class CatalogOpUtil { cmd += "DATABASE " + req.getDb_name(); } else if (req.isSetTable_name()) { cmd += "TABLE " + TableName.fromThrift(req.getTable_name()); - if (req.isSetPartition_spec()) cmd += " PARTITIONS"; + if (req.isSetPartition_spec() || req.isSetPartition_spec_list()) { + cmd += " PARTITIONS"; + } } else if (req.isAuthorization()) { cmd += "AUTHORIZATION"; } else { diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java index 397885a12..19f889c45 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzerTest.java @@ -430,6 +430,10 @@ public class AnalyzerTest extends FrontendTestBase { assertAction.accept(AnalyzesOk( "refresh functional.alltypessmall partition (year=2009, month=NULL)"), ResetMetadataStmt.Action.REFRESH_PARTITION); + assertAction.accept(AnalyzesOk( + "refresh functional.alltypessmall partition (year=2009, month=1)" + + " partition (year=2010, month=2) partition (year=2010, month=4)"), + ResetMetadataStmt.Action.REFRESH_PARTITION); assertAction.accept(AnalyzesOk( "refresh authorization", createAnalysisCtx(createAuthorizationFactory())), ResetMetadataStmt.Action.REFRESH_AUTHORIZATION); @@ -456,6 +460,10 @@ public class AnalyzerTest extends FrontendTestBase { "refresh functional.alltypessmall partition (year=2009, month='foo')", "Value of partition spec (column=month) has incompatible type: 'STRING'. " + "Expected type: 'INT'"); + AnalysisError("refresh functional.alltypessmall partition (year=2009)" + + " partition (month=1)", + "Items in partition spec must exactly match the partition columns in " + + "the table definition: functional.alltypessmall (1 vs 2)"); AnalysisError("refresh functional.zipcode_incomes partition (year=2009, month=1)", "Table is not partitioned: functional.zipcode_incomes"); AnalysisError( @@ -636,10 +644,14 @@ public class AnalyzerTest extends FrontendTestBase { AnalyzesOk("refresh functional.insert_only_transactional_table"); AnalyzesOk("refresh functional_orc_def.full_transactional_table"); AnalysisError("refresh functional.insert_only_transactional_table partition (j=1)", - "Refreshing a partition is not allowed on transactional tables. Try to refresh " + + "Refreshing partitions is not allowed on transactional tables. Try to refresh " + "the whole table instead."); + AnalysisError("refresh functional.insert_only_transactional_table partition (j=1) " + + "partition (j=2)", + "Refreshing partitions is not allowed on transactional tables. Try to refresh " + + "the whole table instead."); AnalysisError("refresh functional_orc_def.full_transactional_table partition (j=1)", - "Refreshing a partition is not allowed on transactional tables. Try to refresh " + + "Refreshing partitions is not allowed on transactional tables. Try to refresh " + "the whole table instead."); } diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index b2b50e697..caf288361 100755 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -3651,8 +3651,11 @@ public class ParserTest extends FrontendTestBase { ParsesOk("refresh Foo"); ParsesOk("refresh Foo.S"); ParsesOk("refresh Foo partition (col=2)"); + ParsesOk("refresh Foo partition (col=2) partition (col=3)"); ParsesOk("refresh Foo.S partition (col=2)"); ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3)"); + ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3) " + + "partition (col1 = 0, col2 = 0) partition (col1 = 1, col2 = 1)"); ParsesOk("refresh functions Foo"); ParsesOk("refresh authorization"); @@ -3664,6 +3667,7 @@ public class ParserTest extends FrontendTestBase { ParserError("refresh"); ParserError("refresh Foo.S partition (col1 = 2, col2)"); ParserError("refresh Foo.S partition ()"); + ParserError("refresh Foo.S partition (col1 = 0), (col1 = 1)"); ParserError("refresh functions Foo.S"); ParserError("refresh authorization Foo"); } diff --git a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java index cc1e43804..f5a4ef93d 100644 --- a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java @@ -232,9 +232,12 @@ public class StmtMetadataLoaderTest { testNoLoad("refresh functions functional"); testNoLoad("refresh authorization"); - // This stmt requires the table to be loaded. + // These stmts require the table to be loaded. testLoadTables("refresh functional.alltypes partition (year=2009, month=1)", 1, 1, new String[] {"default", "functional"}, new String[] {"functional.alltypes"}); + testLoadTables("refresh functional.alltypes partition (year=2009, month=1) " + + "partition (year=2010, month=2)", 1, 1, + new String[] {"default", "functional"}, new String[] {"functional.alltypes"}); } @Test diff --git a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java index f32d4b806..138562db4 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java @@ -1891,6 +1891,8 @@ public class ToSqlTest extends FrontendTestBase { public void testRefresh() { testToSql("REFRESH functional.alltypes"); testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)"); + testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1) " + + "PARTITION (year=2010, month=2)"); testToSql("REFRESH FUNCTIONS functional"); testToSql(createAnalysisCtx(createAuthorizationFactory()), "REFRESH AUTHORIZATION"); } diff --git a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java index a1a26e449..091a2bfaa 100644 --- a/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/events/MetastoreEventsProcessorTest.java @@ -4000,7 +4000,7 @@ public class MetastoreEventsProcessorTest { eventsProcessor_.processEvents(); // Fire a reload event and process partition with empty values MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), true, - Arrays.asList("1"), TEST_DB_NAME, tblName, Collections.emptyMap()); + partVals, TEST_DB_NAME, tblName, Collections.emptyMap()); BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES); processEventsAndVerifyStatus(prevFlag); // insert partition event diff --git a/fe/src/test/java/org/apache/impala/service/FrontendTest.java b/fe/src/test/java/org/apache/impala/service/FrontendTest.java index ee53947eb..e20586ed4 100644 --- a/fe/src/test/java/org/apache/impala/service/FrontendTest.java +++ b/fe/src/test/java/org/apache/impala/service/FrontendTest.java @@ -425,6 +425,8 @@ public class FrontendTest extends FrontendTestBase { db, Arrays.asList(db), Arrays.asList(db + ".foo")); TestCollectRequiredObjectsHelper("REFRESH mydb.foo PARTITION (p=1)", db, Arrays.asList("mydb"), Arrays.asList("mydb.foo")); + TestCollectRequiredObjectsHelper("REFRESH mydb.foo PARTITION (p=1) PARTITION (p=2)", + db, Arrays.asList("mydb"), Arrays.asList("mydb.foo")); TestCollectRequiredObjectsHelper("REFRESH foo PARTITION (p=1)", db, Arrays.asList(db), Arrays.asList(db + ".foo")); diff --git a/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java index 823fd6de1..3849201d1 100644 --- a/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java +++ b/fe/src/test/java/org/apache/impala/util/CatalogOpUtilTest.java @@ -96,6 +96,10 @@ public class CatalogOpUtilTest { req.setPartition_spec(Collections.emptyList()); assertEquals("REFRESH TABLE default.tbl PARTITIONS issued by Alice", CatalogOpUtil.getShortDescForReset(req)); + req.unsetPartition_spec(); + req.setPartition_spec_list(Collections.emptyList()); + assertEquals("REFRESH TABLE default.tbl PARTITIONS issued by Alice", + CatalogOpUtil.getShortDescForReset(req)); } @Test diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py index 612bb2a28..6897424d1 100644 --- a/tests/custom_cluster/test_events_custom_configs.py +++ b/tests/custom_cluster/test_events_custom_configs.py @@ -592,18 +592,18 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): .format(unique_database, test_reload_table)) EventProcessorUtils.wait_for_event_processing(self) - def check_self_events(query): + def check_self_events(query, num_events=1): tbls_refreshed_before, partitions_refreshed_before, \ events_skipped_before = self._get_self_event_metrics() last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) self.client.execute(query) # Check if there is a reload event fired after refresh query. events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id) - assert len(events) == 1 - last_event = events[0] - assert last_event.dbName == unique_database - assert last_event.tableName == test_reload_table - assert last_event.eventType == "RELOAD" + for event in events: + assert event.dbName == unique_database + assert event.tableName == test_reload_table + assert event.eventType == "RELOAD" + assert len(events) == num_events EventProcessorUtils.wait_for_event_processing(self) tbls_refreshed_after, partitions_refreshed_after, \ events_skipped_after = self._get_self_event_metrics() @@ -612,6 +612,11 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase): check_self_events("refresh {}.{} partition(year=2022)" .format(unique_database, test_reload_table)) check_self_events("refresh {}.{}".format(unique_database, test_reload_table)) + # Refresh multiple partitions. The last two are the same. Verify only two RELOAD + # events are generated. + check_self_events( + "refresh {}.{} partition(year=2022) partition(year=2023) partition(year=2023)" + .format(unique_database, test_reload_table), 2) EventProcessorUtils.wait_for_event_processing(self) if enable_sync_to_latest_event_on_ddls: diff --git a/tests/metadata/test_refresh_partition.py b/tests/metadata/test_refresh_partition.py index 18d1fbd69..8dc1d18dc 100644 --- a/tests/metadata/test_refresh_partition.py +++ b/tests/metadata/test_refresh_partition.py @@ -59,6 +59,10 @@ class TestRefreshPartition(ImpalaTestSuite): self.client.execute('refresh %s partition (y=71, z=8857)' % table_name) assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z') assert ['y=333/z=5309'] == self.hive_partition_names(table_name) + self.client.execute( + 'refresh %s partition (y=71, z=8857) partition (y=0, z=0)' % table_name) + assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z') + assert ['y=333/z=5309'] == self.hive_partition_names(table_name) def test_remove_data_and_refresh(self, unique_database): """ @@ -91,6 +95,31 @@ class TestRefreshPartition(ImpalaTestSuite): result = self.client.execute("select count(*) from %s" % table_name) assert result.data == [str('0')] + # Test multiple partitions + self.client.execute( + 'insert into table %s partition (y, z) values ' + '(2, 33, 444), (3, 44, 555), (4, 55, 666)' % table_name) + result = self.client.execute('select * from %s' % table_name) + assert '2\t33\t444' in result.data + assert '3\t44\t555' in result.data + assert '4\t55\t666' in result.data + assert len(result.data) == 3 + # Drop two partitions in Hive + self.run_stmt_in_hive( + 'alter table %s drop partition (y>33)' % table_name) + # Query the table. With file handle caching, this may not produce an error, + # because the file handles are still open in the cache. If the system does + # produce an error, it should be the expected error. + try: + self.client.execute("select * from %s" % table_name) + except IMPALA_CONNECTION_EXCEPTION as e: + assert expected_error in str(e) + self.client.execute( + 'refresh %s partition (y=33, z=444) partition (y=44, z=555) ' + 'partition (y=55, z=666)' % table_name) + result = self.client.execute("select count(*) from %s" % table_name) + assert result.data == ['1'] + def test_add_delete_data_to_hdfs_and_refresh(self, unique_database): """ Data added/deleted directly in HDFS is visible in impala after refresh of @@ -106,8 +135,9 @@ class TestRefreshPartition(ImpalaTestSuite): create table %s like functional.alltypes stored as parquet location '%s' """ % (table_name, table_location)) - self.client.execute("alter table %s add partition (year=2010, month=1)" % - table_name) + for month in range(1, 5): + self.client.execute("alter table %s add partition (year=2010, month=%d)" % + (table_name, month)) self.client.execute("refresh %s" % table_name) # Check that there is no data in table result = self.client.execute("select count(*) from %s" % table_name) @@ -127,6 +157,29 @@ class TestRefreshPartition(ImpalaTestSuite): result = self.client.execute("select count(*) from %s" % table_name) assert result.data == [str(0)] + # Test multiple partitions + for month in range(2, 5): + dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name) + self.filesystem_client.copy(src_file, dst_path, overwrite=True) + # Check that data added is not visible before refresh + result = self.client.execute("select count(*) from %s" % table_name) + assert result.data == ['0'] + # Chech that data is visible after refresh + self.client.execute( + "refresh %s partition (year=2010, month=2) partition (year=2010, month=3) " + "partition (year=2010, month=4)" % table_name) + result = self.client.execute("select count(*) from %s" % table_name) + assert result.data == [str(file_num_rows * 3)] + # Check that after deleting the file and refreshing, it returns zero rows + for month in range(2, 5): + dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name) + check_call(["hadoop", "fs", "-rm", dst_path], shell=False) + self.client.execute( + "refresh %s partition (year=2010, month=2) partition (year=2010, month=3) " + "partition (year=2010, month=4)" % table_name) + result = self.client.execute("select count(*) from %s" % table_name) + assert result.data == ['0'] + def test_confirm_individual_refresh(self, unique_database): """ Data added directly to HDFS is only visible for the partition refreshed @@ -141,7 +194,7 @@ class TestRefreshPartition(ImpalaTestSuite): create table %s like functional.alltypes stored as parquet location '%s' """ % (table_name, table_location)) - for month in [1, 2]: + for month in range(1, 6): self.client.execute("alter table %s add partition (year=2010, month=%s)" % (table_name, month)) self.client.execute("refresh %s" % table_name) @@ -149,7 +202,7 @@ class TestRefreshPartition(ImpalaTestSuite): result = self.client.execute("select count(*) from %s" % table_name) assert result.data == [str(0)] dst_path = table_location + "/year=2010/month=%s/" + file_name - for month in [1, 2]: + for month in range(1, 6): self.filesystem_client.copy(src_file, dst_path % month, overwrite=True) # Check that data added is not visible before refresh result = self.client.execute("select count(*) from %s" % table_name) @@ -168,3 +221,9 @@ class TestRefreshPartition(ImpalaTestSuite): self.client.execute("refresh %s partition (year=2010, month=2)" % table_name) result = self.client.execute("select count(*) from %s" % table_name) assert result.data == [str(file_num_rows * 2)] + # Refresh multiple partitions + self.client.execute( + "refresh %s partition (year=2010, month=3) partition (year=2010, month=4) " + "partition (year=2010, month=5)" % table_name) + result = self.client.execute("select count(*) from %s" % table_name) + assert result.data == [str(file_num_rows * 5)]