IMPALA-14089: Support REFRESH on multiple partitions

Currently we just support REFRESH on the whole table or a specific
partition:
  REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]

If users want to refresh multiple partitions, they have to submit
multiple statements each for a single partition. This has some
drawbacks:
 - It requires holding the table write lock inside catalogd multiple
   times, which increase lock contention with other read/write
   operations on the same table, e.g. getPartialCatalogObject requests
   from coordinators.
 - Catalog version of the table will be increased multiple times.
   Coordinators in local catalog mode is more likely to see different
   versions between their getPartialCatalogObject requests so have to
   retry planning to resolve InconsistentMetadataFetchException.
 - Partitions are reloaded in sequence. They should be reloaded in
   parallel like we do in refreshing the whole table.

This patch extends the syntax to refresh multiple partitions in one
statement:
  REFRESH [db_name.]table_name
  [PARTITION (key_col1=val1 [, key_col2=val2...])
   [PARTITION (key_col1=val3 [, key_col2=val4...])...]]
Example:
  REFRESH foo PARTITION(p=0) PARTITION(p=1) PARTITION(p=2);

TResetMetadataRequest is extended to have a list of partition specs for
this. If the list has only one item, we still use the existing logic of
reloading a specific partition. If the list has more than one item,
partitions will be reloaded in parallel. This is implemented in
CatalogServiceCatalog#reloadTable(). Previously it always invokes
HdfsTable#load() with partitionsToUpdate=null. Now the parameter is
set when TResetMetadataRequest has the partition list.

HMS notification events in RELOAD type will be fired for each partition
if enable_reload_events is turned on. Once HIVE-28967 is resolved, we
can fire a single event for multiple partitions.

Updated docs in impala_refresh.xml.

Tests:
 - Added FE and e2e tests

Change-Id: Ie5b0deeaf23129ed6e1ba2817f54291d7f63d04e
Reviewed-on: http://gerrit.cloudera.org:8080/22938
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2025-05-22 18:33:29 +08:00
committed by Impala Public Jenkins
parent 063b90c433
commit b37f4509fa
20 changed files with 268 additions and 85 deletions

View File

@@ -325,8 +325,9 @@ struct TResetMetadataRequest {
// the entire catalog
4: optional CatalogObjects.TTableName table_name
// If set, refreshes the specified partition, otherwise
// refreshes the whole table
// Deprecated - use partition_spec_list instead. Keeps this for compatibility.
// If set, refreshes the specified partition.
// Refreshes the whole table if both this and partition_spec_list are not set.
5: optional list<CatalogObjects.TPartitionKeyValue> partition_spec
// If set, refreshes functions in the specified database.
@@ -344,6 +345,10 @@ struct TResetMetadataRequest {
// debug_action is set from the query_option when available.
10: optional string debug_action
// If set, refreshes the specified list of partitions
// Refreshes the whole table if both this and partition_spec are not set.
11: optional list<list<CatalogObjects.TPartitionKeyValue>> partition_spec_list
}
// Response from TResetMetadataRequest

View File

@@ -10604,6 +10604,7 @@ under the License.
<keydef keys="impala132"><topicmeta><keywords><keyword>Impala 1.3.2</keyword></keywords></topicmeta></keydef>
<keydef keys="impala130"><topicmeta><keywords><keyword>Impala 1.3.0</keyword></keywords></topicmeta></keydef>
<keydef keys="impala50_full"><topicmeta><keywords><keyword>Impala 5.0</keyword></keywords></topicmeta></keydef>
<keydef keys="impala42_full"><topicmeta><keywords><keyword>Impala 4.2</keyword></keywords></topicmeta></keydef>
<keydef keys="impala34_full"><topicmeta><keywords><keyword>Impala 3.4</keyword></keywords></topicmeta></keydef>
<keydef keys="impala33_full"><topicmeta><keywords><keyword>Impala 3.3</keyword></keywords></topicmeta></keydef>

View File

@@ -67,7 +67,9 @@ under the License.
<p conref="../shared/impala_common.xml#common/syntax_blurb"/>
<codeblock rev="IMPALA-1683">REFRESH [<varname>db_name</varname>.]<varname>table_name</varname> [PARTITION (<varname>key_col1</varname>=<varname>val1</varname> [, <varname>key_col2</varname>=<varname>val2</varname>...])]</codeblock>
<codeblock rev="IMPALA-1683">REFRESH [<varname>db_name</varname>.]<varname>table_name</varname>
[PARTITION (<varname>key_col1</varname>=<varname>val1</varname> [, <varname>key_col2</varname>=<varname>val2</varname>...])
[PARTITION (<varname>key_col1</varname>=<varname>val3</varname> [, <varname>key_col2</varname>=<varname>val4</varname>...])...]</codeblock>
<p conref="../shared/impala_common.xml#common/usage_notes_blurb"/>
@@ -115,7 +117,7 @@ under the License.
<p conref="../shared/impala_common.xml#common/refresh_vs_invalidate"/>
<p rev="IMPALA-1683">
<b>Refreshing a single partition:</b>
<b>Refreshing specific partitions:</b>
</p>
<p rev="IMPALA-1683">
@@ -125,6 +127,13 @@ under the License.
values for each of the partition key columns.
</p>
<p>
In <keyword keyref="impala50_full"/> and higher, the <codeph>REFRESH</codeph> statement
can apply to multiple partitions at a time, rather than a single partition. Use the
optional <codeph>PARTITION (<varname>partition_spec</varname>)</codeph> clause for each
each of the partition.
</p>
<p>
The following rules apply:
<ul>
@@ -164,6 +173,9 @@ refresh p2 partition (z=1, y=0)
-- Incomplete partition spec causes an error.
refresh p2 partition (y=0)
ERROR: AnalysisException: Items in partition spec must exactly match the partition columns in the table definition: default.p2 (1 vs 2)
-- Refresh multiple partitions.
refresh p2 partition (y=0, z=3) partition (y=1, z=0) partition (y=1, z=2);
]]>
</codeblock>

View File

@@ -435,7 +435,7 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
*/
@VisibleForTesting
public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
boolean isRefresh, List<String> partVals, String dbName, String tableName,
boolean isRefresh, List<List<String>> partVals, String dbName, String tableName,
Map<String, String> selfEventParams) throws TException {
throw new UnsupportedOperationException("Reload event is not supported.");
}

View File

@@ -524,15 +524,14 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
* @param msClient Metastore client,
* @param isRefresh if this flag is set to true then it is a refresh query, else it
* is an invalidate metadata query.
* @param partVals The partition list corresponding to
* the table, used by Apache Hive 3
* @param partValsList partition values (List<String>) for each partition
* @param dbName
* @param tableName
* @return a list of eventIds for the reload events
*/
@VisibleForTesting
public static List<Long> fireReloadEventHelper(MetaStoreClient msClient,
boolean isRefresh, List<String> partVals, String dbName, String tableName,
boolean isRefresh, List<List<String>> partValsList, String dbName, String tableName,
Map<String, String> selfEventParams) throws TException {
Preconditions.checkNotNull(msClient);
Preconditions.checkNotNull(dbName);
@@ -542,16 +541,32 @@ public class MetastoreShim extends Hive3MetastoreShimBase {
FireEventRequest rqst = new FireEventRequest(true, data);
rqst.setDbName(dbName);
rqst.setTableName(tableName);
rqst.setPartitionVals(partVals);
rqst.setTblParams(selfEventParams);
FireEventResponse response = msClient.getHiveClient().fireListenerEvent(rqst);
if (!response.isSetEventIds()) {
LOG.error("FireEventResponse does not have event ids set for table {}.{}. This "
+ "may cause the table to unnecessarily be refreshed when the " +
"refresh/invalidate event is received.", dbName, tableName);
return Collections.emptyList();
if (partValsList == null || partValsList.isEmpty()) {
FireEventResponse response = msClient.getHiveClient().fireListenerEvent(rqst);
if (!response.isSetEventIds()) {
LOG.error("FireEventResponse does not have event ids set for table {}.{}. This "
+ "may cause the table to unnecessarily be refreshed when the " +
"refresh/invalidate event is received.", dbName, tableName);
return Collections.emptyList();
}
return response.getEventIds();
}
return response.getEventIds();
List<Long> eventIds = new ArrayList<>();
// TODO: Fire one event once HIVE-28967 is resolved.
for (List<String> partVals : partValsList) {
rqst.setPartitionVals(partVals);
FireEventResponse response = msClient.getHiveClient().fireListenerEvent(rqst);
if (!response.isSetEventIds()) {
LOG.error("FireEventResponse does not have event ids set for table {}.{} " +
"partition {}. This may cause the table to unnecessarily be refreshed " +
"when the refresh/invalidate event is received.",
dbName, tableName, partVals);
continue;
}
eventIds.addAll(response.getEventIds());
}
return eventIds;
}
/**

View File

@@ -504,6 +504,8 @@ nonterminal PartitionSpec partition_spec;
nonterminal PartitionSet opt_partition_set;
// Required partition set
nonterminal PartitionSet partition_set;
// Optional partition spec list
nonterminal List<PartitionSpec> partition_spec_list;
nonterminal List<PartitionKeyValue> partition_clause;
nonterminal List<PartitionKeyValue> static_partition_key_value_list;
nonterminal List<PartitionKeyValue> partition_key_value_list;
@@ -845,8 +847,8 @@ reset_metadata_stmt ::=
{: RESULT = ResetMetadataStmt.createInvalidateStmt(table); :}
| KW_REFRESH table_name:table
{: RESULT = ResetMetadataStmt.createRefreshTableStmt(table); :}
| KW_REFRESH table_name:table partition_spec:partition
{: RESULT = ResetMetadataStmt.createRefreshPartitionStmt(table, partition); :}
| KW_REFRESH table_name:table partition_spec_list:partitions
{: RESULT = ResetMetadataStmt.createRefreshPartitionsStmt(table, partitions); :}
| KW_REFRESH KW_FUNCTIONS ident_or_unreserved:db
{: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :}
| KW_REFRESH KW_AUTHORIZATION
@@ -2774,6 +2776,20 @@ opt_partition_spec ::=
{: RESULT = null; :}
;
partition_spec_list ::=
partition_spec:item
{:
List<PartitionSpec> list = new ArrayList<>();
list.add(item);
RESULT = list;
:}
| partition_spec_list:list partition_spec:item
{:
list.add(item);
RESULT = list;
:}
;
static_partition_key_value_list ::=
static_partition_key_value:item
{:

View File

@@ -68,8 +68,8 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
// database functions.
private TableName tableName_;
// not null when refreshing a single partition
private final PartitionSpec partitionSpec_;
// not null when refreshing specified partitions
private final List<PartitionSpec> partitionSpecList_;
// not null when refreshing functions in a database.
private final String database_;
@@ -87,13 +87,15 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
private String clientIp_;
private ResetMetadataStmt(Action action, String db, TableName tableName,
PartitionSpec partitionSpec) {
List<PartitionSpec> partitionSpecList) {
Preconditions.checkNotNull(action);
action_ = action;
database_ = db;
tableName_ = tableName;
partitionSpec_ = partitionSpec;
if (partitionSpec_ != null) partitionSpec_.setTableName(tableName_);
partitionSpecList_ = partitionSpecList;
if (partitionSpecList_ != null) {
partitionSpecList_.forEach(p -> p.setTableName(tableName_));
}
}
public static ResetMetadataStmt createInvalidateStmt() {
@@ -111,10 +113,11 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
Preconditions.checkNotNull(tableName), /*partition*/ null);
}
public static ResetMetadataStmt createRefreshPartitionStmt(TableName tableName,
PartitionSpec partitionSpec) {
public static ResetMetadataStmt createRefreshPartitionsStmt(TableName tableName,
List<PartitionSpec> partitionSpecList) {
return new ResetMetadataStmt(Action.REFRESH_PARTITION, /*db*/ null,
Preconditions.checkNotNull(tableName), Preconditions.checkNotNull(partitionSpec));
Preconditions.checkNotNull(tableName),
Preconditions.checkNotNull(partitionSpecList));
}
public static ResetMetadataStmt createRefreshFunctionsStmt(String db) {
@@ -132,8 +135,6 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
@Override
public TableName getTableName() { return tableName_; }
public PartitionSpec getPartitionSpec() { return partitionSpec_; }
@VisibleForTesting
protected Action getAction() { return action_; }
@@ -144,7 +145,7 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
@Override
public void collectTableRefs(List<TableRef> tblRefs) {
if (tableName_ != null && partitionSpec_ != null) {
if (tableName_ != null && partitionSpecList_ != null) {
tblRefs.add(new TableRef(tableName_.toPath(), null));
}
}
@@ -172,20 +173,22 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
throw new AnalysisException(Analyzer.TBL_DOES_NOT_EXIST_ERROR_MSG +
tableName_);
}
if (partitionSpec_ != null) {
if (partitionSpecList_ != null) {
try {
// Get local table info without reaching out to HMS
FeTable table = analyzer.getTable(dbName, tableName_.getTbl(),
/* must_exist */ true);
if (AcidUtils.isTransactionalTable(table)) {
throw new AnalysisException("Refreshing a partition is not allowed on " +
throw new AnalysisException("Refreshing partitions is not allowed on " +
"transactional tables. Try to refresh the whole table instead.");
}
} catch (TableLoadingException e) {
throw new AnalysisException(e);
}
partitionSpec_.setPrivilegeRequirement(Privilege.ANY);
partitionSpec_.analyze(analyzer);
for (PartitionSpec ps : partitionSpecList_) {
ps.setPrivilegeRequirement(Privilege.ANY);
ps.analyze(analyzer);
}
}
} else {
FeTable tbl = analyzer.getTableNoThrow(dbName, tableName_.getTbl());
@@ -244,8 +247,8 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
result.append("REFRESH ").append(tableName_.toSql());
break;
case REFRESH_PARTITION:
result.append("REFRESH ").append(tableName_.toSql()).append(" ")
.append(partitionSpec_.toSql(options));
result.append("REFRESH ").append(tableName_.toSql());
partitionSpecList_.forEach(ps -> result.append(" ").append(ps.toSql(options)));
break;
case INVALIDATE_METADATA_ALL:
result.append("INVALIDATE METADATA");
@@ -270,7 +273,11 @@ public class ResetMetadataStmt extends StatementBase implements SingleTableStmt
if (tableName_ != null) {
params.setTable_name(new TTableName(tableName_.getDb(), tableName_.getTbl()));
}
if (partitionSpec_ != null) params.setPartition_spec(partitionSpec_.toThrift());
if (partitionSpecList_ != null) {
for (PartitionSpec ps : partitionSpecList_) {
params.addToPartition_spec_list(ps.toThrift());
}
}
if (database_ != null) params.setDb_name(database_);
if (action_ == Action.REFRESH_AUTHORIZATION) {
params.setAuthorization(true);

View File

@@ -2863,7 +2863,7 @@ public class CatalogServiceCatalog extends Catalog {
CatalogObject.ThriftObjectType resultType, String reason, long eventId,
boolean isSkipFileMetadataReload, EventSequence catalogTimeline)
throws CatalogException {
LOG.info(String.format("Refreshing table metadata: %s", tbl.getFullName()));
LOG.info("Refreshing table metadata: {}", tbl.getFullName());
Preconditions.checkState(!(tbl instanceof IncompleteTable));
String dbName = tbl.getDb().getName();
String tblName = tbl.getName();
@@ -2905,10 +2905,20 @@ public class CatalogServiceCatalog extends Catalog {
dbName + "." + tblName, e);
}
if (tbl instanceof HdfsTable) {
Set<String> partitionsToUpdate = null;
if (request.getPartition_spec_listSize() > 0) {
// Convert to partition names and deduplicate partition specs.
Map<String, List<TPartitionKeyValue>> partName2PartSpec = new HashMap<>();
for (List<TPartitionKeyValue> partSpec : request.getPartition_spec_list()) {
partName2PartSpec.put(HdfsTable.constructPartitionName(partSpec), partSpec);
}
partitionsToUpdate = partName2PartSpec.keySet();
request.setPartition_spec_list(new ArrayList<>(partName2PartSpec.values()));
}
((HdfsTable) tbl)
.load(true, msClient.getHiveClient(), msTbl, !isSkipFileMetadataReload,
/* loadTableSchema*/true, request.refresh_updated_hms_partitions,
/* partitionsToUpdate*/null, request.debug_action,
partitionsToUpdate, request.debug_action,
/*partitionToEventId*/null, reason, catalogTimeline);
} else {
tbl.load(true, msClient.getHiveClient(), msTbl, reason, catalogTimeline);
@@ -3505,8 +3515,8 @@ public class CatalogServiceCatalog extends Catalog {
String reason, long newCatalogVersion, @Nullable HdfsPartition hdfsPartition,
EventSequence catalogTimeline) throws CatalogException {
Preconditions.checkState(hdfsTable.isWriteLockedByCurrentThread());
LOG.info(String.format("Refreshing partition metadata: %s %s (%s)",
hdfsTable.getFullName(), partitionName, reason));
LOG.info("Refreshing partition metadata: {} {} ({})",
hdfsTable.getFullName(), partitionName, reason);
try (MetaStoreClient msClient = getMetaStoreClient(catalogTimeline)) {
org.apache.hadoop.hive.metastore.api.Partition hmsPartition = null;
try {
@@ -3522,9 +3532,9 @@ public class CatalogServiceCatalog extends Catalog {
// non-existing partition was dropped from catalog, so we mark it as refreshed
wasPartitionReloaded.setRef(true);
} else {
LOG.info(String.format("Partition metadata for %s was not refreshed since "
LOG.info("Partition metadata for {} {} was not refreshed since "
+ "it does not exist in metastore anymore",
hdfsTable.getFullName() + " " + partitionName));
hdfsTable.getFullName(), partitionName);
}
return hdfsTable.toTCatalogObject(resultType);
} catch (Exception e) {
@@ -3540,8 +3550,8 @@ public class CatalogServiceCatalog extends Catalog {
}
hdfsTable.setCatalogVersion(newCatalogVersion);
wasPartitionReloaded.setRef(true);
LOG.info(String.format("Refreshed partition metadata: %s %s",
hdfsTable.getFullName(), partitionName));
LOG.info("Refreshed partition metadata: {} {}", hdfsTable.getFullName(),
partitionName);
return hdfsTable.toTCatalogObject(resultType);
}

View File

@@ -1498,7 +1498,11 @@ public class HdfsTable extends Table implements FeFsTable {
* hive metastore.
* @return true if partition does not exist in metastore, else false.
*/
public abstract boolean isRemoved(HdfsPartition hdfsPartition);
public boolean isRemoved(HdfsPartition hdfsPartition) {
return isRemoved(hdfsPartition.getPartitionName());
}
public abstract boolean isRemoved(String partName);
/**
* Loads any partitions which are known to metastore but not provided in
@@ -1566,9 +1570,14 @@ public class HdfsTable extends Table implements FeFsTable {
loadTimeForFileMdNs_ += loadNewPartitions(partitionNames, addedPartitions);
// If a list of modified partitions (old and new) is specified, don't reload file
// metadata for the new ones as they have already been detected in HMS and have been
// reloaded by loadNewPartitions().
// reloaded by loadNewPartitions(). Also ignore partitions that don't exist in HMS.
if (partitionsToUpdate_ != null) {
partitionsToUpdate_.removeAll(addedPartitions);
int orgSize = partitionsToUpdate_.size();
if (partitionsToUpdate_.removeIf(this::isRemoved)) {
LOG.info("Ignored {} non-existing partitions of table {}",
orgSize - partitionsToUpdate_.size(), getFullName());
}
}
// Load file metadata. Until we have a notification mechanism for when a
// file changes in hdfs, it is sometimes required to reload all the file
@@ -1638,8 +1647,8 @@ public class HdfsTable extends Table implements FeFsTable {
}
@Override
public boolean isRemoved(HdfsPartition hdfsPartition) {
return !msPartitions_.containsKey(hdfsPartition.getPartitionName());
public boolean isRemoved(String partName) {
return !msPartitions_.containsKey(partName);
}
/**
@@ -1732,8 +1741,8 @@ public class HdfsTable extends Table implements FeFsTable {
}
@Override
public boolean isRemoved(HdfsPartition hdfsPartition) {
return !partitionNamesFromHms_.contains(hdfsPartition.getPartitionName());
public boolean isRemoved(String partName) {
return !partitionNamesFromHms_.contains(partName);
}
@Override

View File

@@ -7158,12 +7158,14 @@ public class CatalogOpExecutor {
CatalogObject.ThriftObjectType.INVALIDATION :
CatalogObject.ThriftObjectType.FULL;
if (isTableLoadedInCatalog) {
if (req.isSetPartition_spec()) {
if (req.isSetPartition_spec() || req.getPartition_spec_listSize() == 1) {
Preconditions.checkArgument(!AcidUtils.isTransactionalTable(tbl));
List<TPartitionKeyValue> partitionSpec = req.isSetPartition_spec() ?
req.getPartition_spec() : req.getPartition_spec_list().get(0);
Reference<Boolean> wasPartitionRefreshed = new Reference<>(false);
// TODO if the partition was not really refreshed because the partSpec
// was wrong, do we still need to send back the table?
updatedThriftTable = catalog_.reloadPartition(tbl, req.getPartition_spec(),
updatedThriftTable = catalog_.reloadPartition(tbl, partitionSpec,
wasPartitionRefreshed, resultType, cmdString, catalogTimeline);
} else {
// TODO IMPALA-8809: Optimisation for partitioned tables:
@@ -7264,15 +7266,26 @@ public class CatalogOpExecutor {
*/
private void fireReloadEventAndUpdateRefreshEventId(
TResetMetadataRequest req, TableName tblName, Table tbl) {
List<String> partVals = null;
// Partition spec (List<TPartitionKeyValue>) for each partition
List<List<TPartitionKeyValue>> partSpecList = null;
// Partition values (List<String>) for each partition
List<List<String>> partValsList = null;
if (req.isSetPartition_spec()) {
partVals = req.getPartition_spec().stream().
map(TPartitionKeyValue::getValue).collect(Collectors.toList());
partSpecList = Collections.singletonList(req.partition_spec);
} else if (req.isSetPartition_spec_list()) {
partSpecList = req.partition_spec_list;
}
if (partSpecList != null) {
partValsList = partSpecList.stream()
.map(ps -> ps.stream()
.map(TPartitionKeyValue::getValue)
.collect(Collectors.toList()))
.collect(Collectors.toList());
}
try {
List<Long> eventIds = MetastoreShim.fireReloadEventHelper(
catalog_.getMetaStoreClient(), req.isIs_refresh(), partVals, tblName.getDb(),
tblName.getTbl(), Collections.emptyMap());
catalog_.getMetaStoreClient(), req.isIs_refresh(), partValsList,
tblName.getDb(), tblName.getTbl(), Collections.emptyMap());
LOG.info("Fired {} RELOAD events for table {}: {}", eventIds.size(),
tbl.getFullName(), StringUtils.join(",", eventIds));
// Update the lastRefreshEventId accordingly
@@ -7283,19 +7296,21 @@ public class CatalogOpExecutor {
tbl.getFullName());
return;
}
if (req.isSetPartition_spec()) {
HdfsTable hdfsTbl = (HdfsTable) tbl;
HdfsPartition partition = hdfsTbl
.getPartitionFromThriftPartitionSpec(req.getPartition_spec());
if (partition != null) {
HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
partBuilder.setLastRefreshEventId(eventIds.get(0));
hdfsTbl.updatePartition(partBuilder);
} else {
LOG.warn("Partition {} no longer exists in table {}. It might be " +
"dropped by a concurrent operation.",
FeCatalogUtils.getPartitionName(hdfsTbl, partVals),
hdfsTbl.getFullName());
if (partSpecList != null) {
for (int i = 0; i < partSpecList.size(); ++i) {
HdfsTable hdfsTbl = (HdfsTable) tbl;
HdfsPartition partition = hdfsTbl
.getPartitionFromThriftPartitionSpec(partSpecList.get(i));
if (partition != null) {
HdfsPartition.Builder partBuilder = new HdfsPartition.Builder(partition);
partBuilder.setLastRefreshEventId(eventIds.get(0));
hdfsTbl.updatePartition(partBuilder);
} else {
LOG.warn("Partition {} no longer exists in table {}. It might be " +
"dropped by a concurrent operation.",
FeCatalogUtils.getPartitionName(hdfsTbl, partValsList.get(i)),
hdfsTbl.getFullName());
}
}
} else {
tbl.setLastRefreshEventId(eventIds.get(0));

View File

@@ -139,7 +139,9 @@ public class CatalogOpUtil {
cmd += "DATABASE " + req.getDb_name();
} else if (req.isSetTable_name()) {
cmd += "TABLE " + TableName.fromThrift(req.getTable_name());
if (req.isSetPartition_spec()) cmd += " PARTITIONS";
if (req.isSetPartition_spec() || req.isSetPartition_spec_list()) {
cmd += " PARTITIONS";
}
} else if (req.isAuthorization()) {
cmd += "AUTHORIZATION";
} else {

View File

@@ -430,6 +430,10 @@ public class AnalyzerTest extends FrontendTestBase {
assertAction.accept(AnalyzesOk(
"refresh functional.alltypessmall partition (year=2009, month=NULL)"),
ResetMetadataStmt.Action.REFRESH_PARTITION);
assertAction.accept(AnalyzesOk(
"refresh functional.alltypessmall partition (year=2009, month=1)" +
" partition (year=2010, month=2) partition (year=2010, month=4)"),
ResetMetadataStmt.Action.REFRESH_PARTITION);
assertAction.accept(AnalyzesOk(
"refresh authorization", createAnalysisCtx(createAuthorizationFactory())),
ResetMetadataStmt.Action.REFRESH_AUTHORIZATION);
@@ -456,6 +460,10 @@ public class AnalyzerTest extends FrontendTestBase {
"refresh functional.alltypessmall partition (year=2009, month='foo')",
"Value of partition spec (column=month) has incompatible type: 'STRING'. "
+ "Expected type: 'INT'");
AnalysisError("refresh functional.alltypessmall partition (year=2009)" +
" partition (month=1)",
"Items in partition spec must exactly match the partition columns in "
+ "the table definition: functional.alltypessmall (1 vs 2)");
AnalysisError("refresh functional.zipcode_incomes partition (year=2009, month=1)",
"Table is not partitioned: functional.zipcode_incomes");
AnalysisError(
@@ -636,10 +644,14 @@ public class AnalyzerTest extends FrontendTestBase {
AnalyzesOk("refresh functional.insert_only_transactional_table");
AnalyzesOk("refresh functional_orc_def.full_transactional_table");
AnalysisError("refresh functional.insert_only_transactional_table partition (j=1)",
"Refreshing a partition is not allowed on transactional tables. Try to refresh " +
"Refreshing partitions is not allowed on transactional tables. Try to refresh " +
"the whole table instead.");
AnalysisError("refresh functional.insert_only_transactional_table partition (j=1) " +
"partition (j=2)",
"Refreshing partitions is not allowed on transactional tables. Try to refresh " +
"the whole table instead.");
AnalysisError("refresh functional_orc_def.full_transactional_table partition (j=1)",
"Refreshing a partition is not allowed on transactional tables. Try to refresh " +
"Refreshing partitions is not allowed on transactional tables. Try to refresh " +
"the whole table instead.");
}

View File

@@ -3651,8 +3651,11 @@ public class ParserTest extends FrontendTestBase {
ParsesOk("refresh Foo");
ParsesOk("refresh Foo.S");
ParsesOk("refresh Foo partition (col=2)");
ParsesOk("refresh Foo partition (col=2) partition (col=3)");
ParsesOk("refresh Foo.S partition (col=2)");
ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3)");
ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3) " +
"partition (col1 = 0, col2 = 0) partition (col1 = 1, col2 = 1)");
ParsesOk("refresh functions Foo");
ParsesOk("refresh authorization");
@@ -3664,6 +3667,7 @@ public class ParserTest extends FrontendTestBase {
ParserError("refresh");
ParserError("refresh Foo.S partition (col1 = 2, col2)");
ParserError("refresh Foo.S partition ()");
ParserError("refresh Foo.S partition (col1 = 0), (col1 = 1)");
ParserError("refresh functions Foo.S");
ParserError("refresh authorization Foo");
}

View File

@@ -232,9 +232,12 @@ public class StmtMetadataLoaderTest {
testNoLoad("refresh functions functional");
testNoLoad("refresh authorization");
// This stmt requires the table to be loaded.
// These stmts require the table to be loaded.
testLoadTables("refresh functional.alltypes partition (year=2009, month=1)", 1, 1,
new String[] {"default", "functional"}, new String[] {"functional.alltypes"});
testLoadTables("refresh functional.alltypes partition (year=2009, month=1) " +
"partition (year=2010, month=2)", 1, 1,
new String[] {"default", "functional"}, new String[] {"functional.alltypes"});
}
@Test

View File

@@ -1891,6 +1891,8 @@ public class ToSqlTest extends FrontendTestBase {
public void testRefresh() {
testToSql("REFRESH functional.alltypes");
testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)");
testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1) " +
"PARTITION (year=2010, month=2)");
testToSql("REFRESH FUNCTIONS functional");
testToSql(createAnalysisCtx(createAuthorizationFactory()), "REFRESH AUTHORIZATION");
}

View File

@@ -4000,7 +4000,7 @@ public class MetastoreEventsProcessorTest {
eventsProcessor_.processEvents();
// Fire a reload event and process partition with empty values
MetastoreShim.fireReloadEventHelper(catalog_.getMetaStoreClient(), true,
Arrays.asList("1"), TEST_DB_NAME, tblName, Collections.emptyMap());
partVals, TEST_DB_NAME, tblName, Collections.emptyMap());
BackendConfig.INSTANCE.setDebugActions(DebugUtils.MOCK_EMPTY_PARTITION_VALUES);
processEventsAndVerifyStatus(prevFlag);
// insert partition event

View File

@@ -425,6 +425,8 @@ public class FrontendTest extends FrontendTestBase {
db, Arrays.asList(db), Arrays.asList(db + ".foo"));
TestCollectRequiredObjectsHelper("REFRESH mydb.foo PARTITION (p=1)",
db, Arrays.asList("mydb"), Arrays.asList("mydb.foo"));
TestCollectRequiredObjectsHelper("REFRESH mydb.foo PARTITION (p=1) PARTITION (p=2)",
db, Arrays.asList("mydb"), Arrays.asList("mydb.foo"));
TestCollectRequiredObjectsHelper("REFRESH foo PARTITION (p=1)",
db, Arrays.asList(db), Arrays.asList(db + ".foo"));

View File

@@ -96,6 +96,10 @@ public class CatalogOpUtilTest {
req.setPartition_spec(Collections.emptyList());
assertEquals("REFRESH TABLE default.tbl PARTITIONS issued by Alice",
CatalogOpUtil.getShortDescForReset(req));
req.unsetPartition_spec();
req.setPartition_spec_list(Collections.emptyList());
assertEquals("REFRESH TABLE default.tbl PARTITIONS issued by Alice",
CatalogOpUtil.getShortDescForReset(req));
}
@Test

View File

@@ -592,18 +592,18 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
.format(unique_database, test_reload_table))
EventProcessorUtils.wait_for_event_processing(self)
def check_self_events(query):
def check_self_events(query, num_events=1):
tbls_refreshed_before, partitions_refreshed_before, \
events_skipped_before = self._get_self_event_metrics()
last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client)
self.client.execute(query)
# Check if there is a reload event fired after refresh query.
events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id)
assert len(events) == 1
last_event = events[0]
assert last_event.dbName == unique_database
assert last_event.tableName == test_reload_table
assert last_event.eventType == "RELOAD"
for event in events:
assert event.dbName == unique_database
assert event.tableName == test_reload_table
assert event.eventType == "RELOAD"
assert len(events) == num_events
EventProcessorUtils.wait_for_event_processing(self)
tbls_refreshed_after, partitions_refreshed_after, \
events_skipped_after = self._get_self_event_metrics()
@@ -612,6 +612,11 @@ class TestEventProcessingCustomConfigs(TestEventProcessingCustomConfigsBase):
check_self_events("refresh {}.{} partition(year=2022)"
.format(unique_database, test_reload_table))
check_self_events("refresh {}.{}".format(unique_database, test_reload_table))
# Refresh multiple partitions. The last two are the same. Verify only two RELOAD
# events are generated.
check_self_events(
"refresh {}.{} partition(year=2022) partition(year=2023) partition(year=2023)"
.format(unique_database, test_reload_table), 2)
EventProcessorUtils.wait_for_event_processing(self)
if enable_sync_to_latest_event_on_ddls:

View File

@@ -59,6 +59,10 @@ class TestRefreshPartition(ImpalaTestSuite):
self.client.execute('refresh %s partition (y=71, z=8857)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
self.client.execute(
'refresh %s partition (y=71, z=8857) partition (y=0, z=0)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
def test_remove_data_and_refresh(self, unique_database):
"""
@@ -91,6 +95,31 @@ class TestRefreshPartition(ImpalaTestSuite):
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str('0')]
# Test multiple partitions
self.client.execute(
'insert into table %s partition (y, z) values '
'(2, 33, 444), (3, 44, 555), (4, 55, 666)' % table_name)
result = self.client.execute('select * from %s' % table_name)
assert '2\t33\t444' in result.data
assert '3\t44\t555' in result.data
assert '4\t55\t666' in result.data
assert len(result.data) == 3
# Drop two partitions in Hive
self.run_stmt_in_hive(
'alter table %s drop partition (y>33)' % table_name)
# Query the table. With file handle caching, this may not produce an error,
# because the file handles are still open in the cache. If the system does
# produce an error, it should be the expected error.
try:
self.client.execute("select * from %s" % table_name)
except IMPALA_CONNECTION_EXCEPTION as e:
assert expected_error in str(e)
self.client.execute(
'refresh %s partition (y=33, z=444) partition (y=44, z=555) '
'partition (y=55, z=666)' % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['1']
def test_add_delete_data_to_hdfs_and_refresh(self, unique_database):
"""
Data added/deleted directly in HDFS is visible in impala after refresh of
@@ -106,8 +135,9 @@ class TestRefreshPartition(ImpalaTestSuite):
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
self.client.execute("alter table %s add partition (year=2010, month=1)" %
table_name)
for month in range(1, 5):
self.client.execute("alter table %s add partition (year=2010, month=%d)" %
(table_name, month))
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
@@ -127,6 +157,29 @@ class TestRefreshPartition(ImpalaTestSuite):
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Test multiple partitions
for month in range(2, 5):
dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name)
self.filesystem_client.copy(src_file, dst_path, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['0']
# Chech that data is visible after refresh
self.client.execute(
"refresh %s partition (year=2010, month=2) partition (year=2010, month=3) "
"partition (year=2010, month=4)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 3)]
# Check that after deleting the file and refreshing, it returns zero rows
for month in range(2, 5):
dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name)
check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
self.client.execute(
"refresh %s partition (year=2010, month=2) partition (year=2010, month=3) "
"partition (year=2010, month=4)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['0']
def test_confirm_individual_refresh(self, unique_database):
"""
Data added directly to HDFS is only visible for the partition refreshed
@@ -141,7 +194,7 @@ class TestRefreshPartition(ImpalaTestSuite):
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
for month in [1, 2]:
for month in range(1, 6):
self.client.execute("alter table %s add partition (year=2010, month=%s)" %
(table_name, month))
self.client.execute("refresh %s" % table_name)
@@ -149,7 +202,7 @@ class TestRefreshPartition(ImpalaTestSuite):
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = table_location + "/year=2010/month=%s/" + file_name
for month in [1, 2]:
for month in range(1, 6):
self.filesystem_client.copy(src_file, dst_path % month, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
@@ -168,3 +221,9 @@ class TestRefreshPartition(ImpalaTestSuite):
self.client.execute("refresh %s partition (year=2010, month=2)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 2)]
# Refresh multiple partitions
self.client.execute(
"refresh %s partition (year=2010, month=3) partition (year=2010, month=4) "
"partition (year=2010, month=5)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 5)]