IMPALA-10871: Add MetastoreShim to support Apache Hive 3.1.2

Like IMPALA-8369, this patch adds a compatibility shim in fe so that
Impala can interoperate with Hive 3.1.2. we need adds a new
Metastoreshim class under compat-apache-hive-3 directory. These shim
classes implement method which are different in cdp-hive-3 vs
apache-hive-3 and are used by front end code. At the build time, based
on the environment variable IMPALA_HIVE_DIST_TYPE one of the two shims
is added to as source using the fe/pom.xml build plugin.

Some codes that directly use Hive 4 APIs need to be ignored in
compilation, eg. fe/src/main/java/org/apache/impala/catalog/metastore/.
Use Maven profile to ignore some codes, profile will automatically
activated based on the IMPALA_HIVE_DIST_TYPE.

Testing:
1. Code compiles and runs against both HMS-3 and ASF-HMS-3
2. Ran full-suite of tests against HMS-3
3. Running full-tests against ASF-HMS-3 will need more work
supporting Tez in the mini-cluster (for dataloading) and HMS
transaction support. This will be on-going effort and test failures
on ASF-Hive-3 will be fixed in additional sub-tasks.

Notes:
1. Patch uses a custom build of Apache Hive to be deployed in
mini-cluster. This build has the fixes for HIVE-21569, HIVE-20038.
This hack will be added to the build script in additional sub-tasks.

Change-Id: I9f08db5f6da735ac431819063060941f0941f606
Reviewed-on: http://gerrit.cloudera.org:8080/17774
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Fucun Chu
2021-08-19 11:23:30 +08:00
committed by Impala Public Jenkins
parent b99534e073
commit 4186727fe6
27 changed files with 2514 additions and 1030 deletions

View File

@@ -44,6 +44,8 @@ CONFIGS=(
"-skiptests -noclean -asan"
"-skiptests -noclean -tsan"
"-skiptests -noclean -ubsan -so -ninja"
# USE_APACHE_HIVE=true build:
"-skiptests -noclean -use_apache_hive"
)
FAILED=""
@@ -63,7 +65,14 @@ function onexit {
trap onexit EXIT
for CONFIG in "${CONFIGS[@]}"; do
DESCRIPTION="Options $CONFIG"
CONFIG2=${CONFIG/-use_apache_hive/}
if [[ "$CONFIG" != "$CONFIG2" ]]; then
CONFIG=$CONFIG2
export USE_APACHE_HIVE=true
else
export USE_APACHE_HIVE=false
fi
DESCRIPTION="Options $CONFIG USE_APACHE_HIVE=$USE_APACHE_HIVE"
if [[ $# == 1 && $1 == "--dryrun" ]]; then
echo $DESCRIPTION

View File

@@ -130,6 +130,7 @@ testdata/AllTypesErrorNoNulls/*.txt
*.avsc
*.parq
*.parquet
testdata/cluster/hive/*.diff
testdata/cluster/node_templates/cdh5/etc/hadoop/conf/*.xml.tmpl
testdata/cluster/node_templates/common/etc/kudu/*.conf.tmpl
testdata/cluster/node_templates/common/etc/hadoop/conf/*.xml.tmpl

View File

@@ -419,10 +419,8 @@ bootstrap_dependencies() {
"$IMPALA_HOME/bin/bootstrap_toolchain.py"
echo "Toolchain bootstrap complete."
fi
# HIVE-22915
if [[ "${USE_APACHE_HIVE}" = true ]]; then
rm $HIVE_HOME/lib/guava-*jar
cp $HADOOP_HOME/share/hadoop/hdfs/lib/guava-*.jar $HIVE_HOME/lib/
"$IMPALA_HOME/testdata/bin/patch_hive.sh"
fi
}

View File

@@ -756,7 +756,7 @@ under the License.
-->
<source>${project.basedir}/generated-sources/gen-java</source>
<source>${project.build.directory}/generated-sources/cup</source>
<source>${project.basedir}/src/compat-hive-${hive.major.version}/java</source>
<source>${project.basedir}/src/compat-${hive.dist.type}-${hive.major.version}/java</source>
</sources>
</configuration>
</execution>
@@ -1053,6 +1053,38 @@ under the License.
</build>
</profile>
<!-- Profile which is automatically activated based on the value of the
IMPALA_HIVE_DIST_TYPE environment -->
<profile>
<id>apache-hive-3</id>
<activation>
<property>
<name>env.IMPALA_HIVE_DIST_TYPE</name>
<value>apache-hive</value>
</property>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<excludes>
<exclude>**/org/apache/impala/catalog/metastore/*.java</exclude>
<exclude>**/org/apache/impala/catalog/CatalogHmsAPIHelper.java</exclude>
<exclude>**/org/apache/impala/catalog/CompactionInfoLoader.java</exclude>
</excludes>
<testExcludes>
<testExclude>**/org/apache/impala/catalog/metastore/*.java</testExclude>
<testExclude>**/org/apache/impala/testutil/Catalog*.java</testExclude>
</testExcludes>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<!-- Profile which is automatically activated when building from
within Eclipse based on the presence of the m2e.version
property -->

View File

@@ -0,0 +1,916 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.compat;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_TABLE;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FireEventRequest;
import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Client;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.json.JSONDropDatabaseMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.Hive3MetastoreShimBase;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEvent;
import org.apache.impala.catalog.events.MetastoreNotificationException;
import org.apache.impala.catalog.events.SelfEventContext;
import org.apache.impala.catalog.local.MetaProvider.PartitionMetadata;
import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Metrics;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AcidUtils.TblTransaction;
import org.apache.impala.util.MetaStoreUtil.TableInsertEventInfo;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A wrapper around some of Hive's Metastore API's to abstract away differences
* between major versions of different Hive publishers. This implements the shimmed
* methods for Apache Hive 3.
*/
public class MetastoreShim extends Hive3MetastoreShimBase {
private static final Logger LOG = LoggerFactory.getLogger(MetastoreShim.class);
public static final byte ACCESSTYPE_NONE = (byte) 1;
public static final byte ACCESSTYPE_READONLY = (byte) 2;
public static final byte ACCESSTYPE_WRITEONLY = (byte) 4;
public static final byte ACCESSTYPE_READWRITE = (byte) 8;
private static final String ACCESSTYPE = "accessType";
private static final String WRITEID = "writeId";
private static final String ID = "id";
private static final String MANAGEDLOCATIONURI = "managedLocationUri";
private static final String CONNECTORREAD = "CONNECTORREAD";
private static final String CONNECTORWRITE = "CONNECTORWRITE";
private static List<String> processorCapabilities = Lists.newArrayList();
/**
* Wrapper around IMetaStoreClient.alter_table with validWriteIds as a param.
*/
public static void alterTableWithTransaction(IMetaStoreClient client,
Table tbl, TblTransaction tblTxn)
throws ImpalaRuntimeException {
throw new UnsupportedOperationException(
"alterTableWithTransaction is not supported.");
}
/**
* Wrapper around IMetaStoreClient.alter_partitions with transaction information
*/
public static void alterPartitionsWithTransaction(IMetaStoreClient client,
String dbName, String tblName, List<Partition> partitions, TblTransaction tblTxn
) throws InvalidOperationException, MetaException, TException {
throw new UnsupportedOperationException(
"alterPartitionsWithTransaction is not supported.");
}
/**
* Wrapper around IMetaStoreClient.getTableColumnStatistics() to deal with added
* arguments.
*/
public static List<ColumnStatisticsObj> getTableColumnStatistics(
IMetaStoreClient client, String dbName, String tableName, List<String> colNames)
throws NoSuchObjectException, MetaException, TException {
return client.getTableColumnStatistics(dbName, tableName, colNames);
}
/**
* Wrapper around IMetaStoreClient.deleteTableColumnStatistics() to deal with added
* arguments.
*/
public static boolean deleteTableColumnStatistics(IMetaStoreClient client,
String dbName, String tableName, String colName)
throws NoSuchObjectException, MetaException, InvalidObjectException, TException,
InvalidInputException {
return client.deleteTableColumnStatistics(dbName, tableName, colName);
}
/**
* Wrapper around ColumnStatistics c'tor to deal with the added engine property.
*/
public static ColumnStatistics createNewHiveColStats() {
ColumnStatistics colStats = new ColumnStatistics();
return colStats;
}
/**
* Method which maps Metastore's TableType to Impala's table type. In metastore 2
* Materialized view is not supported
*/
public static String mapToInternalTableType(String typeStr) {
String defaultTableType = TABLE_TYPE_TABLE;
TableType tType;
if (typeStr == null) return defaultTableType;
try {
tType = TableType.valueOf(typeStr.toUpperCase());
} catch (Exception e) {
return defaultTableType;
}
switch (tType) {
case EXTERNAL_TABLE:
case MANAGED_TABLE:
return TABLE_TYPE_TABLE;
case VIRTUAL_VIEW:
case MATERIALIZED_VIEW:
return TABLE_TYPE_VIEW;
default:
return defaultTableType;
}
}
/**
* Wrapper method which returns HMS-3 Message factory in case Impala is
* building against Apache Hive-3
*/
public static MessageDeserializer getMessageDeserializer() {
return MessageFactory.getInstance().getDeserializer();
}
/**
* Wrapper around FileUtils.makePartName to deal with package relocation in Hive 3.
* This method uses the metastore's FileUtils method instead of one from hive-exec
* @param partitionColNames
* @param values
* @return
*/
public static String makePartName(List<String> partitionColNames, List<String> values) {
return FileUtils.makePartName(partitionColNames, values);
}
/**
* Wrapper method around message factory's build alter table message due to added
* arguments in hive 3.
*/
@VisibleForTesting
public static AlterTableMessage buildAlterTableMessage(Table before, Table after,
boolean isTruncateOp, long writeId) {
return JSONMessageFactory.getInstance().buildAlterTableMessage(before, after,
isTruncateOp);
}
/**
* Wrapper around HMS-3 message serializer
* @param message
* @return serialized string to use used in the NotificationEvent's message field
*/
@VisibleForTesting
public static String serializeEventMessage(EventMessage message) {
return message.toString();
}
/**
* Get valid write ids from HMS for the acid table
* @param client the client to access HMS
* @param tableFullName the name for the table
* @return ValidWriteIdList object
*/
public static ValidWriteIdList fetchValidWriteIds(IMetaStoreClient client,
String tableFullName) throws TException {
// fix HIVE-20929
ValidTxnList txns = client.getValidTxns();
List<String> tablesList = Collections.singletonList(tableFullName);
List<TableValidWriteIds> writeIdList = client
.getValidWriteIds(tablesList, txns.toString());
return TxnUtils.createValidReaderWriteIdList(writeIdList.get(0));
}
/**
* Wrapper around HMS Partition object to get writeID
* WriteID is introduced in ACID 2
* It is used to detect changes of the partition
*/
public static long getWriteIdFromMSPartition(Partition partition) {
Preconditions.checkNotNull(partition);
return NumberUtils.toLong(partition.getParameters().get(WRITEID), -1);
}
/**
* Wrapper around HMS Partition object to set writeID
* WriteID is introduced in ACID 2
* It is used to detect changes of the partition
*/
public static void setWriteIdToMSPartition(Partition partition, long writeId) {
Preconditions.checkNotNull(partition);
partition.getParameters().put(WRITEID, String.valueOf(writeId));
}
/**
* Wrapper around HMS Table object to get writeID
* Per table writeId is introduced in ACID 2
* It is used to detect changes of the table
*/
public static long getWriteIdFromMSTable(Table msTbl) {
Preconditions.checkNotNull(msTbl);
return NumberUtils.toLong(msTbl.getParameters().get(WRITEID), -1);
}
/**
* Set impala capabilities to check the accessType of the table created by hive
* Impala supports:
* - external table read/write
* - insert-only Acid table read
* - virtual view read
* - materialized view read
*/
public static synchronized void setHiveClientCapabilities() {
if (capabilitiestSet_) return;
String[] capabilities = new String[] {
EXTWRITE, // External table write
EXTREAD, // External table read
HIVEMANAGEDINSERTREAD, // Insert-only table read
HIVEMANAGEDINSERTWRITE, // Insert-only table write
HIVEFULLACIDREAD,
HIVEFULLACIDWRITE,
HIVESQL,
HIVEMQT,
HIVEBUCKET2 // Includes the capability to get the correct bucket number.
// Currently, without this capability, for an external bucketed
// table, Hive will return the table as Read-only with bucket
// number -1. It makes clients unable to know it is a bucketed table.
// TODO: will remove this capability when Hive can provide
// API calls to tell the changing of bucket number.
};
processorCapabilities = Lists.newArrayList(capabilities);
capabilitiestSet_ = true;
}
/**
* Check if a table has a capability
* @param msTbl hms table
* @param requiredCapability hive access types or combination of them
* @return true if the table has the capability
*/
public static boolean hasTableCapability(Table msTbl, byte requiredCapability) {
Preconditions.checkNotNull(msTbl);
// access types in binary:
// ACCESSTYPE_NONE: 00000001
// ACCESSTYPE_READONLY: 00000010
// ACCESSTYPE_WRITEONLY: 00000100
// ACCESSTYPE_READWRITE: 00001000
return requiredCapability != ACCESSTYPE_NONE
&& ((getAccessType(msTbl) & requiredCapability) != 0);
}
/**
* Get Access type in string
* @param msTbl hms table
* @return the string represents the table access type.
*/
public static String getTableAccessType(Table msTbl) {
Preconditions.checkNotNull(msTbl);
switch (getAccessType(msTbl)) {
case ACCESSTYPE_READONLY:
return "READONLY";
case ACCESSTYPE_WRITEONLY:
return "WRITEONLY";
case ACCESSTYPE_READWRITE:
return "READWRITE";
case ACCESSTYPE_NONE:
default:
return "NONE";
}
}
/**
* Set table access type. This is useful for hms Table object constructed for create
* table statement. For example, to create a table, we need Read/Write capabilities
* not default 0(not defined)
*/
public static void setTableAccessType(Table msTbl, byte accessType) {
Preconditions.checkNotNull(msTbl);
msTbl.getParameters().put(ACCESSTYPE, String.valueOf(accessType));
}
/**
* CDP Hive-3 only function
*/
public static void setTableColumnStatsTransactional(IMetaStoreClient client,
Table msTbl, ColumnStatistics colStats, TblTransaction tblTxn)
throws ImpalaRuntimeException {
throw new UnsupportedOperationException(
"setTableColumnStatsTransactional is not supported.");
}
/**
* Fire insert events for table and partition.
* In case of any exception, we just log the failure of firing insert events.
*/
public static List<Long> fireInsertEvents(MetaStoreClient msClient,
TableInsertEventInfo insertEventInfo, String dbName, String tableName) {
Stopwatch sw = Stopwatch.createStarted();
try {
if (insertEventInfo.isTransactional()) {
// if the table is transactional we use a different API to fire these
// events. Note that we don't really need the event ids here for self-event
// detection since these events are not fetched by the EventsProcessor later
// These events are mostly required for incremental replication in Hive
fireInsertTransactionalEventHelper(msClient.getHiveClient(),
insertEventInfo, dbName, tableName);
} else {
return fireInsertEventHelper(msClient.getHiveClient(),
insertEventInfo.getInsertEventReqData(),
insertEventInfo.getInsertEventPartVals(), dbName,
tableName);
}
} catch (Exception e) {
LOG.error("Failed to fire insert event. Some tables might not be"
+ " refreshed on other impala clusters.", e);
} finally {
LOG.info("Time taken to fire insert events on table {}.{}: {} msec", dbName,
tableName, sw.stop().elapsed(TimeUnit.MILLISECONDS));
msClient.close();
}
return Collections.emptyList();
}
/**
* CDP Hive-3 only function
* Fires a listener event of the type ACID_WRITE on a transactional table in metastore.
* This event is polled by other external systems to detect insert operations into
* ACID tables.
* @throws TException in case of errors during HMS API call.
*/
private static void fireInsertTransactionalEventHelper(
IMetaStoreClient hiveClient, TableInsertEventInfo insertEventInfo, String dbName,
String tableName) throws TException {
throw new UnsupportedOperationException(
"fireInsertTransactionalEventHelper is not supported.");
}
/**
* Fires an insert event to HMS notification log. In Hive-3 for partitioned table,
* all partition insert events will be fired by a bulk API.
*
* @param msClient Metastore client,
* @param insertEventDataList A list of insert event info encapsulating the information
* needed to fire insert events.
* @param insertEventPartValList The partition list corresponding to
* insertEventDataList, used by Apache Hive 3
* @param dbName
* @param tableName
* @return a list of eventIds for the insert events
*/
@VisibleForTesting
public static List<Long> fireInsertEventHelper(IMetaStoreClient msClient,
List<InsertEventRequestData> insertEventDataList,
List<List<String>> insertEventPartValList, String dbName, String tableName)
throws TException {
Preconditions.checkNotNull(msClient);
Preconditions.checkNotNull(dbName);
Preconditions.checkNotNull(tableName);
Preconditions.checkState(!insertEventDataList.isEmpty(), "Atleast one insert event "
+ "info must be provided.");
Preconditions.checkState(insertEventDataList.size() == insertEventPartValList.size());
LOG.debug(String.format(
"Firing %s insert event(s) for %s.%s", insertEventDataList.size(), dbName,
tableName));
for (int i = 0; i < insertEventDataList.size(); i++) {
InsertEventRequestData insertEventData = insertEventDataList.get(i);
List<String> partitionVals = insertEventPartValList.get(i);
FireEventRequestData data = new FireEventRequestData();
FireEventRequest rqst = new FireEventRequest(true, data);
rqst.setDbName(dbName);
rqst.setTableName(tableName);
if (partitionVals != null && !partitionVals.isEmpty()) {
rqst.setPartitionVals(partitionVals);
}
data.setInsertData(insertEventData);
msClient.fireListenerEvent(rqst);
}
//TODO: IMPALA-8632: Add support for self-event detection for insert events
return Collections.EMPTY_LIST;
}
/**
* Use thrift API directly instead of HiveMetastoreClient#getNextNotification because
* the HMS client can throw an IllegalStateException when there is a gap between the
* eventIds returned.
*
* @see org.apache.hadoop.hive.metastore.HiveMetaStoreClient#getNextNotification
*/
public static NotificationEventResponse getNextNotification(IMetaStoreClient msClient,
NotificationEventRequest eventRequest) throws TException {
return getThriftClient(msClient).get_next_notification(eventRequest);
}
private static ThriftHiveMetastore.Client getThriftClient(IMetaStoreClient msClient)
throws MetaException {
try {
// The HMS client does not expose the getThriftClient function, which is obtained
// by reflection here.
if (Proxy.isProxyClass(msClient.getClass())) {
RetryingMetaStoreClient handler =
(RetryingMetaStoreClient) Proxy.getInvocationHandler(msClient);
msClient = (IMetaStoreClient) FieldUtils.readField(handler, "base",
true);
}
Object client = FieldUtils.readField(msClient, "client", true);
if (client == null) {
throw new MetaException("Client is not initialized");
}
if (!(client instanceof ThriftHiveMetastore.Client)) {
throw new MetaException("getThriftClient is only supported in remote metastore "
+ "mode.");
}
return (Client) client;
} catch (IllegalAccessException e) {
throw new MetaException("getThriftClient() fail: " + e.getMessage());
}
}
/**
* Wrapper around Database.setManagedLocationUri() to deal with added arguments.
*/
public static void setManagedLocationUri(Database db, String managedLocation) {
db.getParameters().put(MANAGEDLOCATIONURI, managedLocation);
}
/**
* Wrapper around HMS Database object to get managedLocationUri.
*/
public static String getManagedLocationUri(Database db) {
if (db.getParameters().containsKey(MANAGEDLOCATIONURI)) {
return db.getParameters().get(MANAGEDLOCATIONURI);
}
return null;
}
/**
* Wrapper around HMS Table object to get table id.
*/
public static long getTableId(Table tbl) {
return NumberUtils.toLong(tbl.getParameters().get(ID), -1);
}
/**
* Wrapper around JSONDropDatabaseMessage to get database.
*/
public static Database getDatabaseObject(JSONDropDatabaseMessage dropDatabaseMessage) {
Database database = new Database();
database.setName(dropDatabaseMessage.getDB());
return database;
}
/**
* Wrapper around IMetaStoreClient.truncateTable() to deal with added arguments.
*/
public static void truncateTable(IMetaStoreClient msClient, String dbName,
String tableName, List<String> partNames,
String validWriteIds, long writeId) throws TException {
msClient.truncateTable(dbName, tableName, partNames);
}
/**
* Wrapper around IMetaStoreClient.listPartitions() to deal with added arguments.
*/
public static List<Partition> getPartitions(IMetaStoreClient msClient,
String testDbName, String testTblName) throws TException {
return msClient.listPartitions(testDbName, testTblName, (short) -1);
}
/**
* CDP Hive-3 only function.
* For compatibility, fireInsertEventHelper() added the partitionValList parameter
*/
public static void setPartitionVal(InsertEventRequestData insertEventRequestData,
List<String> partVals) {
}
/**
* CDP Hive-3 only function.
*/
public static void addToSubDirectoryList(InsertEventRequestData insertEventRequestData,
String acidDirPath) {
throw new UnsupportedOperationException("addToSubDirectoryList is not supported.");
}
/**
* CDP Hive-3 only function.
*/
public static List<HdfsPartition.Builder> getPartitionsForRefreshingFileMetadata(
CatalogServiceCatalog catalog, HdfsTable hdfsTable) throws CatalogException {
throw new UnsupportedOperationException(
"getPartitionsForRefreshingFileMetadata is not supported.");
}
/**
* CDP Hive-3 only function.
*/
public static List<PartitionRef> checkLatestCompaction(MetaStoreClientPool msClientPool,
String dbName, String tableName, TableMetaRef table,
Map<PartitionRef, PartitionMetadata> metas, String unPartitionedName)
throws TException {
throw new UnsupportedOperationException("checkLatestCompaction is not supported.");
}
/**
* CDP Hive-3 only function.
*/
public static ICatalogMetastoreServer getCatalogMetastoreServer(
CatalogOpExecutor catalogOpExecutor) {
throw new UnsupportedOperationException(
"getCatalogMetastoreServer is not supported.");
}
/**
* CDP Hive-3 only function.
*/
public static class CommitTxnEvent extends MetastoreEvent {
public CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
NotificationEvent event) {
super(catalogOpExecutor, metrics, event);
throw new UnsupportedOperationException("CommitTxnEvent is not supported.");
}
@Override
protected void process() throws MetastoreNotificationException {
}
@Override
protected boolean isEventProcessingDisabled() {
return false;
}
@Override
protected SelfEventContext getSelfEventContext() {
return null;
}
@Override
protected boolean shouldSkipWhenSyncingToLatestEventId() {
return false;
}
}
/**
* Get Access type in byte from table property
* @param msTbl hms table
* @return the table access type.
*/
private static byte getAccessType(Table msTbl) {
Preconditions.checkNotNull(msTbl);
byte accessType = ACCESSTYPE_NONE;
Map<String, String> params = msTbl.getParameters();
if (params.containsKey(ACCESSTYPE)) {
String accessTypeStr = msTbl.getParameters().get(ACCESSTYPE);
accessType = accessTypeStr.getBytes()[0];
} else { // Table not created by Impala
if (!capabilitiestSet_) setHiveClientCapabilities();
// The following logic comes from hive's MetastoreDefaultTransformer.transform()
String tableType = msTbl.getTableType();
String tCapabilities = params.get(CatalogOpExecutor.CAPABILITIES_KEY);
int numBuckets = msTbl.isSetSd() ? msTbl.getSd().getNumBuckets() : 0;
boolean isBucketed = numBuckets > 0;
LOG.info("Table " + msTbl.getTableName() + ",#bucket=" + numBuckets + ",isBucketed:"
+ isBucketed + ",tableType=" + tableType + ",tableCapabilities="
+ tCapabilities);
// if the table has no tCapabilities
if (tCapabilities == null) {
LOG.debug("Table has no specific required capabilities");
switch (tableType) {
case "EXTERNAL_TABLE":
if (numBuckets > 0) {
if (processorCapabilities.contains(HIVEBUCKET2)) {
LOG.debug("External bucketed table with HB2 capability:RW");
accessType = ACCESSTYPE_READWRITE;
} else {
LOG.debug("External bucketed table without HB2 capability:RO");
accessType = ACCESSTYPE_READONLY;
}
} else { // Unbucketed
if (processorCapabilities.contains(EXTWRITE) && processorCapabilities
.contains(EXTREAD)) {
LOG.debug("External unbucketed table with EXTREAD/WRITE capability:RW");
accessType = ACCESSTYPE_READWRITE;
} else if (processorCapabilities.contains(EXTREAD)) {
LOG.debug("External unbucketed table with EXTREAD capability:RO");
accessType = ACCESSTYPE_READONLY;
} else {
LOG.debug(
"External unbucketed table without EXTREAD/WRITE capability:NONE");
accessType = ACCESSTYPE_NONE;
}
}
break;
case "MANAGED_TABLE":
String txnal = params.get(AcidUtils.TABLE_IS_TRANSACTIONAL);
if (txnal == null || txnal
.equalsIgnoreCase("FALSE")) { // non-ACID MANAGED table
LOG.debug("Managed non-acid table:RW");
accessType = ACCESSTYPE_READWRITE;
}
if (txnal != null && txnal.equalsIgnoreCase("TRUE")) { // ACID table
String txntype = params.get(AcidUtils.TABLE_TRANSACTIONAL_PROPERTIES);
if (txntype != null && txntype
.equalsIgnoreCase("insert_only")) { // MICRO_MANAGED Tables
// MGD table is insert only, not full ACID
if (processorCapabilities.contains(HIVEMANAGEDINSERTWRITE)
|| processorCapabilities.contains(CONNECTORWRITE)) {
LOG.debug("Managed acid table with INSERTWRITE or CONNECTORWRITE "
+ "capability:RW");
// clients have RW access to INSERT-ONLY ACID tables
accessType = ACCESSTYPE_READWRITE;
LOG.info("Processor has one of the write capabilities on insert-only, "
+ "granting RW");
} else if (processorCapabilities.contains(HIVEMANAGEDINSERTREAD)
|| processorCapabilities.contains(CONNECTORREAD)) {
LOG.debug("Managed acid table with INSERTREAD or CONNECTORREAD "
+ "capability:RO");
// clients have RO access to INSERT-ONLY ACID tables
accessType = ACCESSTYPE_READONLY;
LOG.info("Processor has one of the read capabilities on insert-only, "
+ "granting RO");
} else {
// clients have NO access to INSERT-ONLY ACID tables
accessType = ACCESSTYPE_NONE;
LOG.info("Processor has no read or write capabilities on insert-only, "
+ "NO access");
}
} else { // FULL ACID MANAGED TABLE
if (processorCapabilities.contains(HIVEFULLACIDWRITE)
|| processorCapabilities.contains(CONNECTORWRITE)) {
LOG.debug(
"Full acid table with ACIDWRITE or CONNECTORWRITE capability:RW");
// clients have RW access to IUD ACID tables
accessType = ACCESSTYPE_READWRITE;
} else if (processorCapabilities.contains(HIVEFULLACIDREAD)
|| processorCapabilities.contains(CONNECTORREAD)) {
LOG.debug(
"Full acid table with ACIDREAD or CONNECTORREAD capability:RO");
// clients have RO access to IUD ACID tables
accessType = ACCESSTYPE_READONLY;
} else {
LOG.debug("Full acid table without ACIDREAD/WRITE or "
+ "CONNECTORREAD/WRITE capability:NONE");
// clients have NO access to IUD ACID tables
accessType = ACCESSTYPE_NONE;
}
}
}
break;
case "VIRTUAL_VIEW":
if (processorCapabilities.contains(HIVESQL) ||
processorCapabilities.contains(CONNECTORREAD)) {
accessType = ACCESSTYPE_READONLY;
} else {
accessType = ACCESSTYPE_NONE;
}
break;
case "MATERIALIZED_VIEW":
if ((processorCapabilities.contains(CONNECTORREAD) ||
processorCapabilities.contains(HIVEFULLACIDREAD)) && processorCapabilities
.contains(HIVEMQT)) {
LOG.info(
"Processor has one of the READ abilities and HIVEMQT, AccessType=RO");
accessType = ACCESSTYPE_READONLY;
} else {
LOG.info("Processor has no READ abilities or HIVEMQT, AccessType=None");
accessType = ACCESSTYPE_NONE;
}
break;
default:
accessType = ACCESSTYPE_NONE;
break;
}
return accessType;
}
// WITH CAPABLITIES ON TABLE
tCapabilities = tCapabilities.replaceAll("\\s", "")
.toUpperCase(); // remove spaces between tCapabilities + toUppercase
List<String> requiredCapabilities = Arrays.asList(tCapabilities.split(","));
switch (tableType) {
case "EXTERNAL_TABLE":
if (processorCapabilities.containsAll(requiredCapabilities)) {
// AccessType is RW
LOG.info(
"Abilities for match: Table type=" + tableType + ",accesstype is RW");
accessType = ACCESSTYPE_READWRITE;
break;
}
if (requiredCapabilities.contains(EXTWRITE) && processorCapabilities
.contains(EXTWRITE)) {
if (!isBucketed) {
LOG.info("EXTWRITE Matches, accessType=" + ACCESSTYPE_READWRITE);
accessType = ACCESSTYPE_READWRITE;
return accessType;
}
}
if (requiredCapabilities.contains(EXTREAD) && processorCapabilities
.contains(EXTREAD)) {
LOG.info("EXTREAD Matches, accessType=" + ACCESSTYPE_READONLY);
accessType = ACCESSTYPE_READONLY;
} else {
LOG.debug("No matches, accessType=" + ACCESSTYPE_NONE);
accessType = ACCESSTYPE_NONE;
}
break;
case "MANAGED_TABLE":
if (processorCapabilities.size() == 0) { // processor has no capabilities
LOG.info("Client has no capabilities for type " + tableType
+ ",accesstype is NONE");
accessType = ACCESSTYPE_NONE;
return accessType;
}
if (processorCapabilities.containsAll(requiredCapabilities)) {
// AccessType is RW
LOG.info(
"Abilities for match: Table type=" + tableType + ",accesstype is RW");
accessType = ACCESSTYPE_READWRITE;
return accessType;
}
String txnal = params.get(AcidUtils.TABLE_IS_TRANSACTIONAL);
if (txnal == null || txnal
.equalsIgnoreCase("FALSE")) { // non-ACID MANAGED table
LOG.info("Table is non ACID, accesstype is RO");
accessType = ACCESSTYPE_READONLY;
return accessType;
}
if (txnal != null && txnal.equalsIgnoreCase("TRUE")) { // ACID table
String txntype = params.get(AcidUtils.TABLE_TRANSACTIONAL_PROPERTIES);
List<String> hintList = new ArrayList<>();
if (txntype != null && txntype
.equalsIgnoreCase("insert_only")) { // MICRO_MANAGED Tables
LOG.info("Table is INSERTONLY ACID");
// MGD table is insert only, not full ACID
if (processorCapabilities.containsAll(getWrites(requiredCapabilities))
// contains all writes on table
|| processorCapabilities.contains(HIVEFULLACIDWRITE)) {
LOG.info("Processor has all writes or full acid write, access is RW");
// clients have RW access to INSERT-ONLY ACID tables
accessType = ACCESSTYPE_READWRITE;
return accessType;
}
if (processorCapabilities.contains(CONNECTORWRITE)) {
LOG.debug("Managed acid table with CONNECTORWRITE capability:RW");
// clients have RW access to INSERT-ONLY ACID tables with CONNWRITE
accessType = ACCESSTYPE_READWRITE;
return accessType;
} else if (processorCapabilities.containsAll(getReads(requiredCapabilities))
|| processorCapabilities.contains(HIVEMANAGEDINSERTREAD)) {
LOG.debug("Managed acid table with MANAGEDREAD capability:RO");
accessType = ACCESSTYPE_READONLY;
return accessType;
} else if (processorCapabilities.contains(CONNECTORREAD)) {
LOG.debug("Managed acid table with CONNECTORREAD capability:RO");
accessType = ACCESSTYPE_READONLY;
return accessType;
} else {
LOG.debug("Managed acid table without any READ capability:NONE");
accessType = ACCESSTYPE_NONE;
return accessType;
}
} else { // MANAGED FULL ACID TABLES
LOG.info("Table is FULLACID");
if (processorCapabilities.containsAll(getWrites(requiredCapabilities))
// contains all writes on table
|| processorCapabilities.contains(HIVEFULLACIDWRITE)) {
LOG.info("Processor has all writes or atleast " + HIVEFULLACIDWRITE
+ ", access is RW");
// clients have RW access to ACID tables
accessType = ACCESSTYPE_READWRITE;
return accessType;
}
if (processorCapabilities.contains(CONNECTORWRITE)) {
LOG.debug("Full acid table with CONNECTORWRITE capability:RW");
// clients have RW access to IUD ACID tables
accessType = ACCESSTYPE_READWRITE;
return accessType;
} else if (processorCapabilities.contains(HIVEFULLACIDREAD)
|| (processorCapabilities.contains(CONNECTORREAD))) {
LOG.debug("Full acid table with CONNECTORREAD/ACIDREAD capability:RO");
// clients have RO access to IUD ACID tables
accessType = ACCESSTYPE_READONLY;
return accessType;
} else {
LOG.debug("Full acid table without READ capability:RO");
// clients have NO access to IUD ACID tables
accessType = ACCESSTYPE_NONE;
return accessType;
}
}
}
break;
case "VIRTUAL_VIEW":
case "MATERIALIZED_VIEW":
if (processorCapabilities.containsAll(requiredCapabilities)) {
accessType = ACCESSTYPE_READONLY;
} else {
accessType = ACCESSTYPE_NONE;
}
break;
default:
accessType = ACCESSTYPE_NONE;
break;
}
}
return accessType;
}
private static List<String> getWrites(List<String> capabilities) {
List<String> writes = new ArrayList<>();
for (String capability : capabilities) {
if (capability.toUpperCase().endsWith("WRITE") ||
capability.toUpperCase().endsWith("STATS") ||
capability.toUpperCase().endsWith("INVALIDATE")) {
writes.add(capability);
}
}
return writes;
}
private static List<String> getReads(List<String> capabilities) {
List<String> reads = new ArrayList<>();
for (String capability : capabilities) {
if (capability.toUpperCase().endsWith("READ") ||
capability.toUpperCase().endsWith("SQL")) {
reads.add(capability);
}
}
return reads;
}
}

View File

@@ -66,9 +66,9 @@ import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEventsProcessor.EventProcessorStatus;
import org.apache.impala.catalog.events.SelfEventContext;
import org.apache.impala.catalog.metastore.CatalogHmsUtils;
import org.apache.impala.catalog.monitor.CatalogMonitor;
import org.apache.impala.catalog.monitor.CatalogTableMetrics;
import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
import org.apache.impala.catalog.metastore.HmsApiNameEnum;
import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
import org.apache.impala.common.FileSystemUtil;
@@ -2184,13 +2184,13 @@ public class CatalogServiceCatalog extends Catalog {
if (tbl instanceof HdfsTable
&& AcidUtils.compare((HdfsTable) tbl, validWriteIdList, tableId) >= 0) {
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getCounter(CatalogMetastoreServer.CATALOGD_CACHE_HIT_METRIC)
.getCounter(CatalogHmsUtils.CATALOGD_CACHE_HIT_METRIC)
.inc();
// Update the cache stats for a HMS API from which the current method got invoked.
if (HmsApiNameEnum.contains(reason)) {
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getCounter(String
.format(CatalogMetastoreServer.CATALOGD_CACHE_API_HIT_METRIC, reason))
.format(CatalogHmsUtils.CATALOGD_CACHE_API_HIT_METRIC, reason))
.inc();
}
// Check if any partition of the table has a newly compacted file.
@@ -2213,7 +2213,7 @@ public class CatalogServiceCatalog extends Catalog {
if (partsToBeRefreshed.isEmpty()) return tbl;
} else {
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getCounter(CatalogMetastoreServer.CATALOGD_CACHE_MISS_METRIC)
.getCounter(CatalogHmsUtils.CATALOGD_CACHE_MISS_METRIC)
.inc();
// Update the cache stats for a HMS API from which the current method got invoked.
if (HmsApiNameEnum.contains(reason)) {
@@ -2221,7 +2221,7 @@ public class CatalogServiceCatalog extends Catalog {
// have to reload the table.
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getCounter(String.format(
CatalogMetastoreServer.CATALOGD_CACHE_API_MISS_METRIC, reason))
CatalogHmsUtils.CATALOGD_CACHE_API_MISS_METRIC, reason))
.inc();
}
previousCatalogVersion = tbl.getCatalogVersion();

View File

@@ -2773,7 +2773,8 @@ public class HdfsTable extends Table implements FeFsTable {
// only reload partitions that have more recent write id
if (hdfsPartition != null &&
(!AcidUtils.isTransactionalTable(msTable_.getParameters())
|| hdfsPartition.getWriteId() <= partition.getWriteId())) {
|| hdfsPartition.getWriteId() <= MetastoreShim
.getWriteIdFromMSPartition(partition))) {
hmsPartToHdfsPart.put(partition, hdfsPartition);
}
}

View File

@@ -0,0 +1,785 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_TABLE;
import static org.apache.impala.service.MetadataOp.TABLE_TYPE_VIEW;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo;
import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
import org.apache.hive.service.rpc.thrift.TGetColumnsReq;
import org.apache.hive.service.rpc.thrift.TGetFunctionsReq;
import org.apache.hive.service.rpc.thrift.TGetSchemasReq;
import org.apache.hive.service.rpc.thrift.TGetTablesReq;
import org.apache.impala.authorization.User;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
import org.apache.impala.common.TransactionException;
import org.apache.impala.service.Frontend;
import org.apache.impala.service.MetadataOp;
import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AcidUtils.TblTransaction;
import org.apache.impala.util.HiveMetadataFormatUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Base class for Hive 3 MetastoreShim.
*/
public class Hive3MetastoreShimBase {
private static final Logger LOG = LoggerFactory.getLogger(Hive3MetastoreShimBase.class);
protected static final String EXTWRITE = "EXTWRITE";
protected static final String EXTREAD = "EXTREAD";
protected static final String HIVEBUCKET2 = "HIVEBUCKET2";
protected static final String HIVEFULLACIDREAD = "HIVEFULLACIDREAD";
protected static final String HIVEFULLACIDWRITE = "HIVEFULLACIDWRITE";
protected static final String HIVEMANAGEDINSERTREAD = "HIVEMANAGEDINSERTREAD";
protected static final String HIVEMANAGEDINSERTWRITE = "HIVEMANAGEDINSERTWRITE";
protected static final String HIVEMANAGESTATS = "HIVEMANAGESTATS";
// Materialized View
protected static final String HIVEMQT = "HIVEMQT";
// Virtual View
protected static final String HIVESQL = "HIVESQL";
protected static final long MAJOR_VERSION = 3;
protected static boolean capabilitiestSet_ = false;
// Number of retries to acquire an HMS ACID lock.
private static final int LOCK_RETRIES = 10;
// Time interval between retries of acquiring an HMS ACID lock
private static final int LOCK_RETRY_WAIT_SECONDS = 3;
protected final static String HMS_RPC_ERROR_FORMAT_STR =
"Error making '%s' RPC to Hive Metastore: ";
// Id used to register transactions / locks.
// Not final, as it makes sense to set it based on role + instance, see IMPALA-8853.
public static String TRANSACTION_USER_ID = "Impala";
/**
* Initializes and returns a TblTransaction object for table 'tbl'. Opens a new
* transaction if txnId is not valid.
*/
public static TblTransaction createTblTransaction(
IMetaStoreClient client, Table tbl, long txnId)
throws TransactionException {
TblTransaction tblTxn = new TblTransaction();
try {
if (txnId <= 0) {
txnId = openTransaction(client);
tblTxn.ownsTxn = true;
}
tblTxn.txnId = txnId;
tblTxn.writeId =
allocateTableWriteId(client, txnId, tbl.getDbName(), tbl.getTableName());
tblTxn.validWriteIds =
getValidWriteIdListInTxn(client, tbl.getDbName(), tbl.getTableName(), txnId);
return tblTxn;
} catch (TException e) {
if (tblTxn.ownsTxn) {
abortTransactionNoThrow(client, tblTxn.txnId);
}
throw new TransactionException(
String.format(HMS_RPC_ERROR_FORMAT_STR, "createTblTransaction"), e);
}
}
static public void commitTblTransactionIfNeeded(IMetaStoreClient client,
TblTransaction tblTxn) throws TransactionException {
if (tblTxn.ownsTxn) {
commitTransaction(client, tblTxn.txnId);
}
}
static public void abortTblTransactionIfNeeded(IMetaStoreClient client,
TblTransaction tblTxn) {
if (tblTxn.ownsTxn) {
abortTransactionNoThrow(client, tblTxn.txnId);
}
}
/**
* Constant variable that stores engine value needed to store / access Impala column
* statistics.
*/
public static final String IMPALA_ENGINE = "impala";
/**
* Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
*/
public static boolean validateName(String name) {
return MetaStoreUtils.validateName(name, null);
}
/**
* Wrapper around IMetaStoreClient.alter_partition() to deal with added arguments.
*/
public static void alterPartition(IMetaStoreClient client, Partition partition)
throws InvalidOperationException, MetaException, TException {
client.alter_partition(
partition.getDbName(), partition.getTableName(), partition, null);
}
/**
* Wrapper around IMetaStoreClient.alter_partitions() to deal with added arguments.
*/
public static void alterPartitions(IMetaStoreClient client, String dbName,
String tableName, List<Partition> partitions)
throws InvalidOperationException, MetaException, TException {
client.alter_partitions(dbName, tableName, partitions, null);
}
/**
* Wrapper around IMetaStoreClient.createTableWithConstraints() to deal with added
* arguments. Hive four new arguments are uniqueConstraints, notNullConstraints,
* defaultConstraints, and checkConstraints.
*/
public static void createTableWithConstraints(IMetaStoreClient client,
Table newTbl, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys)
throws InvalidOperationException, MetaException, TException {
client.createTableWithConstraints(newTbl, primaryKeys, foreignKeys, null, null,
null, null);
}
/**
* Wrapper around MetaStoreUtils.updatePartitionStatsFast() to deal with added
* arguments.
*/
public static void updatePartitionStatsFast(Partition partition, Table tbl,
Warehouse warehouse) throws MetaException {
MetaStoreUtils.updatePartitionStatsFast(partition, tbl, warehouse, /*madeDir*/false,
/*forceRecompute*/false,
/*environmentContext*/null, /*isCreate*/false);
}
/**
* Return the maximum number of Metastore objects that should be retrieved in a batch.
*/
public static String metastoreBatchRetrieveObjectsMaxConfigKey() {
return MetastoreConf.ConfVars.BATCH_RETRIEVE_OBJECTS_MAX.toString();
}
/**
* Return the key and value that should be set in the partition parameters to mark that
* the stats were generated automatically by a stats task.
*/
public static Pair<String, String> statsGeneratedViaStatsTaskParam() {
return Pair.create(StatsSetupConst.STATS_GENERATED, StatsSetupConst.TASK);
}
public static TResultSet execGetFunctions(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetFunctionsReq req = request.getGet_functions_req();
return MetadataOp.getFunctions(
frontend, req.getCatalogName(), req.getSchemaName(), req.getFunctionName(), user);
}
public static TResultSet execGetColumns(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetColumnsReq req = request.getGet_columns_req();
return MetadataOp.getColumns(frontend, req.getCatalogName(), req.getSchemaName(),
req.getTableName(), req.getColumnName(), user);
}
public static TResultSet execGetTables(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetTablesReq req = request.getGet_tables_req();
return MetadataOp.getTables(frontend, req.getCatalogName(), req.getSchemaName(),
req.getTableName(), req.getTableTypes(), user);
}
public static TResultSet execGetSchemas(
Frontend frontend, TMetadataOpRequest request, User user) throws ImpalaException {
TGetSchemasReq req = request.getGet_schemas_req();
return MetadataOp.getSchemas(
frontend, req.getCatalogName(), req.getSchemaName(), user);
}
/**
* Supported HMS-3 types
*/
public static final EnumSet<TableType> IMPALA_SUPPORTED_TABLE_TYPES = EnumSet
.of(TableType.EXTERNAL_TABLE, TableType.MANAGED_TABLE, TableType.VIRTUAL_VIEW,
TableType.MATERIALIZED_VIEW);
/**
* mapping between the HMS-3 type the Impala types
*/
public static final ImmutableMap<String, String> HMS_TO_IMPALA_TYPE =
new ImmutableMap.Builder<String, String>()
.put("EXTERNAL_TABLE", TABLE_TYPE_TABLE)
.put("MANAGED_TABLE", TABLE_TYPE_TABLE)
.put("INDEX_TABLE", TABLE_TYPE_TABLE)
.put("VIRTUAL_VIEW", TABLE_TYPE_VIEW)
.put("MATERIALIZED_VIEW", TABLE_TYPE_VIEW).build();
// hive-3 introduces a catalog object in hive
// Impala only supports the default catalog of hive
private static final String defaultCatalogName_ = MetaStoreUtils
.getDefaultCatalog(MetastoreConf.newMetastoreConf());
/**
* Gets the name of the default catalog from metastore configuration.
*/
public static String getDefaultCatalogName() {
return defaultCatalogName_;
}
/**
* Wrapper around FileUtils.makePartName to deal with package relocation in Hive 3. This
* method uses the metastore's FileUtils method instead of one from hive-exec
*
* @param partitionColNames
* @param values
* @return
*/
public static String makePartName(List<String> partitionColNames, List<String> values) {
return FileUtils.makePartName(partitionColNames, values);
}
/**
* Wrapper method to get the formatted string to represent the columns information of a
* metastore table. This method was changed in Hive-3 significantly when compared to
* Hive-2. In order to avoid adding unnecessary dependency to hive-exec this method
* copies the source code from hive-2's MetaDataFormatUtils class for this method.
* TODO : In order to avoid this copy, we move move this code from hive's ql module
* to a util method in MetastoreUtils in metastore module
*
* @return
*/
public static String getAllColumnsInformation(List<FieldSchema> tabCols,
List<FieldSchema> partitionCols, boolean printHeader, boolean isOutputPadded,
boolean showPartColsSeparately) {
return HiveMetadataFormatUtils
.getAllColumnsInformation(tabCols, partitionCols, printHeader, isOutputPadded,
showPartColsSeparately);
}
/**
* Wrapper method around Hive's MetadataFormatUtils.getTableInformation which has
* changed significantly in Hive-3
*
* @return
*/
public static String getTableInformation(Table table) {
return HiveMetadataFormatUtils.getTableInformation(table, false);
}
/**
* Wrapper method around Hive-3's MetadataFormatUtils.getConstraintsInformation
*
* @return
*/
public static String getConstraintsInformation(PrimaryKeyInfo pkInfo,
ForeignKeyInfo fkInfo) {
return HiveMetadataFormatUtils.getConstraintsInformation(pkInfo, fkInfo);
}
/**
* This method has been copied from BaseSemanticAnalyzer class of Hive and is fairly
* stable now (last change was in mid 2016 as of April 2019). Copying is preferred over
* adding dependency to this class which pulls in a lot of other transitive dependencies
* from hive-exec
*/
public static String unescapeSQLString(String stringLiteral) {
{
Character enclosure = null;
// Some of the strings can be passed in as unicode. For example, the
// delimiter can be passed in as \002 - So, we first check if the
// string is a unicode number, else go back to the old behavior
StringBuilder sb = new StringBuilder(stringLiteral.length());
for (int i = 0; i < stringLiteral.length(); i++) {
char currentChar = stringLiteral.charAt(i);
if (enclosure == null) {
if (currentChar == '\'' || stringLiteral.charAt(i) == '\"') {
enclosure = currentChar;
}
// ignore all other chars outside the enclosure
continue;
}
if (enclosure.equals(currentChar)) {
enclosure = null;
continue;
}
if (currentChar == '\\' && (i + 6 < stringLiteral.length())
&& stringLiteral.charAt(i + 1) == 'u') {
int code = 0;
int base = i + 2;
for (int j = 0; j < 4; j++) {
int digit = Character.digit(stringLiteral.charAt(j + base), 16);
code = (code << 4) + digit;
}
sb.append((char) code);
i += 5;
continue;
}
if (currentChar == '\\' && (i + 4 < stringLiteral.length())) {
char i1 = stringLiteral.charAt(i + 1);
char i2 = stringLiteral.charAt(i + 2);
char i3 = stringLiteral.charAt(i + 3);
if ((i1 >= '0' && i1 <= '1') && (i2 >= '0' && i2 <= '7')
&& (i3 >= '0' && i3 <= '7')) {
byte bVal = (byte) ((i3 - '0') + ((i2 - '0') * 8) + ((i1 - '0') * 8 * 8));
byte[] bValArr = new byte[1];
bValArr[0] = bVal;
String tmp = new String(bValArr);
sb.append(tmp);
i += 3;
continue;
}
}
if (currentChar == '\\' && (i + 2 < stringLiteral.length())) {
char n = stringLiteral.charAt(i + 1);
switch (n) {
case '0':
sb.append("\0");
break;
case '\'':
sb.append("'");
break;
case '"':
sb.append("\"");
break;
case 'b':
sb.append("\b");
break;
case 'n':
sb.append("\n");
break;
case 'r':
sb.append("\r");
break;
case 't':
sb.append("\t");
break;
case 'Z':
sb.append("\u001A");
break;
case '\\':
sb.append("\\");
break;
// The following 2 lines are exactly what MySQL does TODO: why do we do this?
case '%':
sb.append("\\%");
break;
case '_':
sb.append("\\_");
break;
default:
sb.append(n);
}
i++;
} else {
sb.append(currentChar);
}
}
return sb.toString();
}
}
/**
* Get ValidWriteIdList object by given string
*
* @param validWriteIds ValidWriteIdList object in String
* @return ValidWriteIdList object
*/
public static ValidWriteIdList getValidWriteIdListFromString(String validWriteIds) {
Preconditions.checkNotNull(validWriteIds);
return new ValidReaderWriteIdList(validWriteIds);
}
/**
* Converts a TValidWriteIdList object to ValidWriteIdList.
*
* @param tableName the name of the table.
* @param validWriteIds the thrift object.
* @return ValidWriteIdList object
*/
public static ValidWriteIdList getValidWriteIdListFromThrift(String tableName,
TValidWriteIdList validWriteIds) {
Preconditions.checkNotNull(validWriteIds);
BitSet abortedBits;
if (validWriteIds.getAborted_indexesSize() > 0) {
abortedBits = new BitSet(validWriteIds.getInvalid_write_idsSize());
for (int aborted_index : validWriteIds.getAborted_indexes()) {
abortedBits.set(aborted_index);
}
} else {
abortedBits = new BitSet();
}
long highWatermark = validWriteIds.isSetHigh_watermark() ?
validWriteIds.high_watermark : Long.MAX_VALUE;
long minOpenWriteId = validWriteIds.isSetMin_open_write_id() ?
validWriteIds.min_open_write_id : Long.MAX_VALUE;
return new ValidReaderWriteIdList(tableName,
validWriteIds.getInvalid_write_ids().stream().mapToLong(i -> i).toArray(),
abortedBits, highWatermark, minOpenWriteId);
}
/**
* Converts a ValidWriteIdList object to TValidWriteIdList.
*/
public static TValidWriteIdList convertToTValidWriteIdList(
ValidWriteIdList validWriteIdList) {
Preconditions.checkNotNull(validWriteIdList);
TValidWriteIdList ret = new TValidWriteIdList();
long minOpenWriteId = validWriteIdList.getMinOpenWriteId() != null ?
validWriteIdList.getMinOpenWriteId() : Long.MAX_VALUE;
ret.setHigh_watermark(validWriteIdList.getHighWatermark());
ret.setMin_open_write_id(minOpenWriteId);
ret.setInvalid_write_ids(Arrays.stream(
validWriteIdList.getInvalidWriteIds()).boxed().collect(Collectors.toList()));
List<Integer> abortedIndexes = new ArrayList<>();
for (int i = 0; i < validWriteIdList.getInvalidWriteIds().length; ++i) {
long writeId = validWriteIdList.getInvalidWriteIds()[i];
if (validWriteIdList.isWriteIdAborted(writeId)) {
abortedIndexes.add(i);
}
}
ret.setAborted_indexes(abortedIndexes);
return ret;
}
/**
* Returns a ValidTxnList object that helps to identify in-progress and aborted
* transactions.
*/
public static ValidTxnList getValidTxns(IMetaStoreClient client) throws TException {
return client.getValidTxns();
}
/**
* Get validWriteIds in string with txnId and table name arguments.
*/
private static String getValidWriteIdListInTxn(IMetaStoreClient client, String dbName,
String tblName, long txnId)
throws TException {
ValidTxnList txns = client.getValidTxns(txnId);
String tableFullName = dbName + "." + tblName;
List<TableValidWriteIds> writeIdsObj = client.getValidWriteIds(
Lists.newArrayList(tableFullName), txns.toString());
ValidTxnWriteIdList validTxnWriteIdList = new ValidTxnWriteIdList(txnId);
for (TableValidWriteIds tableWriteIds : writeIdsObj) {
validTxnWriteIdList.addTableValidWriteIdList(
createValidReaderWriteIdList(tableWriteIds));
}
String validWriteIds =
validTxnWriteIdList.getTableValidWriteIdList(tableFullName).writeToString();
return validWriteIds;
}
/**
* Opens a new transaction. Sets userId to TRANSACTION_USER_ID.
*
* @param client is the HMS client to be used.
* @return the new transaction id.
* @throws TransactionException
*/
public static long openTransaction(IMetaStoreClient client)
throws TransactionException {
try {
return client.openTxn(TRANSACTION_USER_ID);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Commits a transaction.
*
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @throws TransactionException
*/
public static void commitTransaction(IMetaStoreClient client, long txnId)
throws TransactionException {
try {
client.commitTxn(txnId);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Aborts a transaction.
*
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @throws TransactionException
*/
public static void abortTransaction(IMetaStoreClient client, long txnId)
throws TransactionException {
try {
client.abortTxns(Arrays.asList(txnId));
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Heartbeats a transaction and/or lock to keep them alive.
*
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @param lockId is the lock id.
* @return True on success, false if the transaction or lock is non-existent anymore.
* @throws TransactionException In case of any other failures.
*/
public static boolean heartbeat(IMetaStoreClient client,
long txnId, long lockId) throws TransactionException {
String errorMsg = "Caught exception during heartbeating transaction " +
String.valueOf(txnId) + " lock " + String.valueOf(lockId);
LOG.info("Sending heartbeat for transaction " + String.valueOf(txnId) +
" lock " + String.valueOf(lockId));
try {
client.heartbeat(txnId, lockId);
} catch (NoSuchLockException e) {
LOG.info(errorMsg, e);
return false;
} catch (NoSuchTxnException e) {
LOG.info(errorMsg, e);
return false;
} catch (TxnAbortedException e) {
LOG.info(errorMsg, e);
return false;
} catch (TException e) {
throw new TransactionException(e.getMessage());
}
return true;
}
/**
* Creates a lock for the given lock components. Returns the acquired lock, this might
* involve some waiting.
*
* @param client is the HMS client to be used.
* @param txnId The transaction ID associated with the lock. Zero if the lock
* doesn't belong to a transaction.
* @param lockComponents the lock components to include in this lock.
* @return the lock id
* @throws TransactionException in case of failure
*/
public static long acquireLock(IMetaStoreClient client, long txnId,
List<LockComponent> lockComponents)
throws TransactionException {
LockRequestBuilder lockRequestBuilder = new LockRequestBuilder();
lockRequestBuilder.setUser(TRANSACTION_USER_ID);
if (txnId > 0) {
lockRequestBuilder.setTransactionId(txnId);
}
for (LockComponent lockComponent : lockComponents) {
lockRequestBuilder.addLockComponent(lockComponent);
}
LockRequest lockRequest = lockRequestBuilder.build();
try {
LockResponse lockResponse = client.lock(lockRequest);
long lockId = lockResponse.getLockid();
int retries = 0;
while (lockResponse.getState() == LockState.WAITING && retries < LOCK_RETRIES) {
try {
//TODO: add profile counter for lock waits.
LOG.info("Waiting " + String.valueOf(LOCK_RETRY_WAIT_SECONDS) +
" seconds for lock " + String.valueOf(lockId) + " of transaction " +
Long.toString(txnId));
Thread.sleep(LOCK_RETRY_WAIT_SECONDS * 1000);
++retries;
lockResponse = client.checkLock(lockId);
} catch (InterruptedException e) {
// Since wait time and number of retries is configurable it wouldn't add
// much value to make acquireLock() interruptible so we just swallow the
// exception here.
}
}
if (lockResponse.getState() == LockState.ACQUIRED) {
return lockId;
}
if (lockId > 0) {
try {
releaseLock(client, lockId);
} catch (TransactionException te) {
LOG.error("Failed to release lock as a cleanup step after acquiring a lock " +
"has failed: " + lockId + " " + te.getMessage());
}
}
throw new TransactionException("Failed to acquire lock for transaction " +
String.valueOf(txnId));
} catch (TException e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Releases a lock in HMS.
*
* @param client is the HMS client to be used.
* @param lockId is the lock ID to be released.
* @throws TransactionException
*/
public static void releaseLock(IMetaStoreClient client, long lockId)
throws TransactionException {
try {
client.unlock(lockId);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* Aborts a transaction and logs the error if there is an exception.
*
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
*/
public static void abortTransactionNoThrow(IMetaStoreClient client, long txnId) {
try {
client.abortTxns(Arrays.asList(txnId));
} catch (Exception e) {
LOG.error("Error in abortTxns.", e);
}
}
/**
* Allocates a write id for the given table.
*
* @param client is the HMS client to be used.
* @param txnId is the transaction id.
* @param dbName is the database name.
* @param tableName is the target table name.
* @return the allocated write id.
* @throws TransactionException
*/
public static long allocateTableWriteId(IMetaStoreClient client, long txnId,
String dbName, String tableName) throws TransactionException {
try {
return client.allocateTableWriteId(txnId, dbName, tableName);
} catch (Exception e) {
throw new TransactionException(e.getMessage());
}
}
/**
* @return the hive major version
*/
public static long getMajorVersion() {
return MAJOR_VERSION;
}
/**
* Borrowed code from hive. This assumes that the caller intends to read the files, and
* thus treats both open and aborted write ids as invalid.
*
* @param tableWriteIds valid write ids for the given table from the metastore
* @return a valid write IDs list for the input table
*/
private static ValidReaderWriteIdList createValidReaderWriteIdList(
TableValidWriteIds tableWriteIds) {
String fullTableName = tableWriteIds.getFullTableName();
long highWater = tableWriteIds.getWriteIdHighWaterMark();
List<Long> invalids = tableWriteIds.getInvalidWriteIds();
BitSet abortedBits = BitSet.valueOf(tableWriteIds.getAbortedBits());
long[] exceptions = new long[invalids.size()];
int i = 0;
for (long writeId : invalids) {
exceptions[i++] = writeId;
}
if (tableWriteIds.isSetMinOpenWriteId()) {
return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits,
highWater, tableWriteIds.getMinOpenWriteId());
} else {
return new ValidReaderWriteIdList(fullTableName, exceptions, abortedBits,
highWater);
}
}
/**
* Return the default table path for a new table.
* <p>
* Hive-3 doesn't allow managed table to be non transactional after HIVE-22158. Creating
* a non transactional managed table will finally result in an external table with table
* property "external.table.purge" set to true. As the table type become EXTERNAL, the
* location will be under "metastore.warehouse.external.dir" (HIVE-19837, introduces in
* hive-2.7, not in hive-2.1.x-cdh6.x yet).
*/
public static String getPathForNewTable(Database db, Table tbl)
throws MetaException {
Warehouse wh = new Warehouse(new HiveConf());
// Non transactional tables are all translated to external tables by HMS's default
// transformer (HIVE-22158). Note that external tables can't be transactional.
// So the request and result of the default transformer is:
// non transactional managed table => external table
// non transactional external table => external table
// transactional managed table => managed table
// transactional external table (not allowed)
boolean isExternal = !AcidUtils.isTransactionalTable(tbl.getParameters());
// TODO(IMPALA-9088): deal with customized transformer in HMS.
return wh.getDefaultTablePath(db, tbl.getTableName().toLowerCase(), isExternal)
.toString();
}
}

View File

@@ -34,17 +34,14 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.GetAllWriteEventInfoRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.json.JSONAlterDatabaseMessage;
@@ -61,7 +58,6 @@ import org.apache.impala.catalog.DatabaseNotFoundException;
import org.apache.impala.catalog.Db;
import org.apache.impala.catalog.FileMetadataLoadOpts;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.IncompleteTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.TableNotFoundException;
@@ -77,7 +73,6 @@ import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.thrift.TPartitionKeyValue;
import org.apache.impala.thrift.TTableName;
import org.apache.impala.util.AcidUtils;
import org.apache.thrift.TException;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
@@ -192,7 +187,7 @@ public class MetastoreEvents {
case ALLOC_WRITE_ID_EVENT:
return new AllocWriteIdEvent(catalogOpExecutor_, metrics, event);
case COMMIT_TXN:
return new CommitTxnEvent(catalogOpExecutor_, metrics, event);
return new MetastoreShim.CommitTxnEvent(catalogOpExecutor_, metrics, event);
case ABORT_TXN:
return new AbortTxnEvent(catalogOpExecutor_, metrics, event);
}
@@ -458,7 +453,7 @@ public class MetastoreEvents {
// metrics registry so that events can add metrics
protected final Metrics metrics_;
MetastoreEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
protected MetastoreEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
NotificationEvent event) {
this.catalogOpExecutor_ = catalogOpExecutor;
this.catalog_ = catalogOpExecutor_.getCatalog();
@@ -1749,7 +1744,8 @@ public class MetastoreEvents {
.getDropDatabaseMessage(event.getMessage());
try {
droppedDatabase_ =
Preconditions.checkNotNull(dropDatabaseMessage.getDatabaseObject());
Preconditions
.checkNotNull(MetastoreShim.getDatabaseObject(dropDatabaseMessage));
} catch (Exception e) {
throw new MetastoreNotificationException(debugString(
"Database object is null in the event. "
@@ -2400,130 +2396,6 @@ public class MetastoreEvents {
}
}
/**
* Metastore event handler for COMMIT_TXN events. Handles commit event for transactional
* tables.
*/
public static class CommitTxnEvent extends MetastoreEvent {
private final CommitTxnMessage commitTxnMessage_;
private final long txnId_;
CommitTxnEvent(CatalogOpExecutor catalogOpExecutor, Metrics metrics,
NotificationEvent event) {
super(catalogOpExecutor, metrics, event);
Preconditions.checkState(getEventType().equals(MetastoreEventType.COMMIT_TXN));
Preconditions.checkNotNull(event.getMessage());
commitTxnMessage_ = MetastoreEventsProcessor.getMessageDeserializer()
.getCommitTxnMessage(event.getMessage());
txnId_ = commitTxnMessage_.getTxnId();
}
@Override
protected void process() throws MetastoreNotificationException {
// To ensure no memory leaking in case an exception is thrown, we remove entries
// at first.
Set<TableWriteId> committedWriteIds = catalog_.removeWriteIds(txnId_);
// Via getAllWriteEventInfo, we can get data insertion info for transactional tables
// even though there are no insert events generated for transactional tables. Note
// that we cannot get DDL info from this API.
List<WriteEventInfo> writeEventInfoList;
try (MetaStoreClientPool.MetaStoreClient client = catalog_.getMetaStoreClient()) {
writeEventInfoList = client.getHiveClient().getAllWriteEventInfo(
new GetAllWriteEventInfoRequest(txnId_));
} catch (TException e) {
throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+ "get write event infos for txn {}. Event processing cannot continue. Issue "
+ "an invalidate metadata command to reset event processor.", txnId_), e);
}
try {
if (writeEventInfoList != null && !writeEventInfoList.isEmpty()) {
commitTxnMessage_.addWriteEventInfo(writeEventInfoList);
addCommittedWriteIdsAndRefreshPartitions();
}
// committed write ids for DDL need to be added here
addCommittedWriteIdsToTables(committedWriteIds);
} catch (Exception e) {
throw new MetastoreNotificationNeedsInvalidateException(debugString("Failed to "
+ "mark committed write ids and refresh partitions for txn {}. Event "
+ "processing cannot continue. Issue an invalidate metadata command to reset "
+ "event processor.", txnId_), e);
}
}
private void addCommittedWriteIdsToTables(Set<TableWriteId> tableWriteIds)
throws CatalogException {
for (TableWriteId tableWriteId: tableWriteIds) {
catalog_.addWriteIdsToTable(tableWriteId.getDbName(), tableWriteId.getTblName(),
getEventId(),
Collections.singletonList(tableWriteId.getWriteId()),
MutableValidWriteIdList.WriteIdStatus.COMMITTED);
}
}
private void addCommittedWriteIdsAndRefreshPartitions() throws Exception {
Preconditions.checkNotNull(commitTxnMessage_.getWriteIds());
List<Long> writeIds = Collections.unmodifiableList(commitTxnMessage_.getWriteIds());
List<Partition> parts = new ArrayList<>();
// To load partitions together for the same table, indexes are grouped by table name
Map<TableName, List<Integer>> tableNameToIdxs = new HashMap<>();
for (int i = 0; i < writeIds.size(); i++) {
org.apache.hadoop.hive.metastore.api.Table tbl = commitTxnMessage_.getTableObj(i);
TableName tableName = new TableName(tbl.getDbName(), tbl.getTableName());
parts.add(commitTxnMessage_.getPartitionObj(i));
tableNameToIdxs.computeIfAbsent(tableName, k -> new ArrayList<>()).add(i);
}
for (Map.Entry<TableName, List<Integer>> entry : tableNameToIdxs.entrySet()) {
org.apache.hadoop.hive.metastore.api.Table tbl =
commitTxnMessage_.getTableObj(entry.getValue().get(0));
List<Long> writeIdsForTable = entry.getValue().stream()
.map(i -> writeIds.get(i))
.collect(Collectors.toList());
List<Partition> partsForTable = entry.getValue().stream()
.map(i -> parts.get(i))
.collect(Collectors.toList());
if (tbl.getPartitionKeysSize() > 0
&& !MetaStoreUtils.isMaterializedViewTable(tbl)) {
try {
catalogOpExecutor_.addCommittedWriteIdsAndReloadPartitionsIfExist(
getEventId(), entry.getKey().getDb(), entry.getKey().getTbl(),
writeIdsForTable, partsForTable, "Processing event id: " +
getEventId() + ", event type: " + getEventType());
} catch (TableNotLoadedException e) {
debugLog("Ignoring reloading since table {} is not loaded",
entry.getKey());
} catch (DatabaseNotFoundException | TableNotFoundException e) {
debugLog("Ignoring reloading since table {} is not found",
entry.getKey());
}
} else {
catalog_.reloadTableIfExists(entry.getKey().getDb(), entry.getKey().getTbl(),
"CommitTxnEvent", getEventId());
}
}
}
@Override
protected boolean isEventProcessingDisabled() {
return false;
}
@Override
protected SelfEventContext getSelfEventContext() {
throw new UnsupportedOperationException("Self-event evaluation is not needed for "
+ "this event type");
}
/*
Not skipping the event since there can be multiple tables involved. The actual
processing of event would skip or process the event on a table by table basis
*/
@Override
protected boolean shouldSkipWhenSyncingToLatestEventId() {
return false;
}
}
/**
* Metastore event handler for ABORT_TXN events. Handles abort event for transactional
* tables.

View File

@@ -297,8 +297,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
NotificationEventRequest eventRequest = new NotificationEventRequest();
eventRequest.setMaxEvents(batchSize);
eventRequest.setLastEvent(currentEventId);
NotificationEventResponse notificationEventResponse = msc.getHiveClient()
.getThriftClient().get_next_notification(eventRequest);
NotificationEventResponse notificationEventResponse = MetastoreShim
.getNextNotification(msc.getHiveClient(), eventRequest);
for (NotificationEvent event : notificationEventResponse.getEvents()) {
// if no filter is provided we add all the events
if (filter == null || filter.accept(event)) result.add(event);
@@ -782,8 +782,8 @@ public class MetastoreEventsProcessor implements ExternalEventsProcessor {
NotificationEventRequest eventRequest = new NotificationEventRequest();
eventRequest.setLastEvent(eventId);
eventRequest.setMaxEvents(batchSize);
NotificationEventResponse response = msClient.getHiveClient().getThriftClient()
.get_next_notification(eventRequest);
NotificationEventResponse response = MetastoreShim
.getNextNotification(msClient.getHiveClient(), eventRequest);
LOG.info(String.format("Received %d events. Start event id : %d",
response.getEvents().size(), eventId));
if (filter == null) return response.getEvents();

View File

@@ -34,11 +34,8 @@ import com.google.common.collect.Iterables;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest;
import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -47,7 +44,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CompactionInfoLoader;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FileMetadataLoader;
import org.apache.impala.catalog.Function;
@@ -424,7 +420,9 @@ class DirectMetaProvider implements MetaProvider {
public Map<String, String> getHmsParameters() { return msPartition_.getParameters(); }
@Override
public long getWriteId() { return msPartition_.getWriteId(); }
public long getWriteId() {
return MetastoreShim.getWriteIdFromMSPartition(msPartition_);
}
@Override
public HdfsStorageDescriptor getInputFormatDescriptor() {
@@ -538,44 +536,10 @@ class DirectMetaProvider implements MetaProvider {
Preconditions.checkNotNull(table, "TableMetaRef must be non-null");
Preconditions.checkNotNull(metas, "Partition map must be non-null");
Stopwatch sw = Stopwatch.createStarted();
List<PartitionRef> stalePartitions = new ArrayList<>();
if (!table.isTransactional() || metas.isEmpty()) return stalePartitions;
GetLatestCommittedCompactionInfoRequest request =
new GetLatestCommittedCompactionInfoRequest(dbName, tableName);
if (table.isPartitioned()) {
request.setPartitionnames(metas.keySet().stream()
.map(PartitionRef::getName).collect(Collectors.toList()));
}
GetLatestCommittedCompactionInfoResponse response;
try (MetaStoreClientPool.MetaStoreClient client = msClientPool_.getClient()) {
response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
}
Map<String, Long> partNameToCompactionId = new HashMap<>();
// If the table is partitioned, we must set partition name, otherwise empty result
// will be returned.
if (table.isPartitioned()) {
for (CompactionInfoStruct ci : response.getCompactions()) {
partNameToCompactionId.put(
Preconditions.checkNotNull(ci.getPartitionname()), ci.getId());
}
} else {
CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(),
null);
if (ci != null) {
partNameToCompactionId.put(PartitionRefImpl.UNPARTITIONED_NAME, ci.getId());
}
}
Iterator<Map.Entry<PartitionRef, PartitionMetadata>> iter =
metas.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<PartitionRef, PartitionMetadata> entry = iter.next();
long latestCompactionId = partNameToCompactionId.getOrDefault(
entry.getKey().getName(), -1L);
if (entry.getValue().getLastCompactionId() < latestCompactionId) {
stalePartitions.add(entry.getKey());
iter.remove();
}
}
List<PartitionRef> stalePartitions = MetastoreShim.checkLatestCompaction(
msClientPool_, dbName, tableName, table, metas,
PartitionRefImpl.UNPARTITIONED_NAME);
LOG.debug("Checked the latest compaction id for {}.{} Time taken: {}", dbName,
tableName, PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));
return stalePartitions;

View File

@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.catalog.metastore;
/**
* Contains constants used for CatalogD HMS metrics, which are migrated from
* CatalogMetastoreServer.java. The purpose is to ignore the compilation of Catalog MS
* (using Hive 4 APIs) when using Apache Hive 3 without affecting other classes.
*/
public class CatalogHmsUtils {
// Metrics for CatalogD HMS cache
public static final String CATALOGD_CACHE_MISS_METRIC = "catalogd-hms-cache.miss";
public static final String CATALOGD_CACHE_HIT_METRIC = "catalogd-hms-cache.hit";
public static final String CATALOGD_CACHE_API_REQUESTS_METRIC =
"catalogd-hms-cache.api-requests";
// CatalogD HMS Cache - API specific metrics
public static final String CATALOGD_CACHE_API_MISS_METRIC =
"catalogd-hms-cache.cache-miss.api.%s";
public static final String CATALOGD_CACHE_API_HIT_METRIC =
"catalogd-hms-cache.cache-hit.api.%s";
}

View File

@@ -98,17 +98,9 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
// Metrics for CatalogD HMS cache
private static final String ACTIVE_CONNECTIONS_METRIC = "metastore.active.connections";
public static final String CATALOGD_CACHE_MISS_METRIC = "catalogd-hms-cache.miss";
public static final String CATALOGD_CACHE_HIT_METRIC = "catalogd-hms-cache.hit";
public static final String CATALOGD_CACHE_API_REQUESTS_METRIC =
"catalogd-hms-cache.api-requests";
// CatalogD HMS Cache - API specific metrics
private static final String RPC_DURATION_FORMAT_METRIC = "metastore.rpc.duration.%s";
public static final String CATALOGD_CACHE_API_MISS_METRIC =
"catalogd-hms-cache.cache-miss.api.%s";
public static final String CATALOGD_CACHE_API_HIT_METRIC =
"catalogd-hms-cache.cache-hit.api.%s";
public static final Set<String> apiNamesSet_ = new HashSet<>();
@@ -132,11 +124,11 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
private void initMetrics() {
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.addCounter(CATALOGD_CACHE_MISS_METRIC);
.addCounter(CatalogHmsUtils.CATALOGD_CACHE_MISS_METRIC);
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.addCounter(CATALOGD_CACHE_HIT_METRIC);
.addCounter(CatalogHmsUtils.CATALOGD_CACHE_HIT_METRIC);
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.addMeter(CATALOGD_CACHE_API_REQUESTS_METRIC);
.addMeter(CatalogHmsUtils.CATALOGD_CACHE_API_REQUESTS_METRIC);
metricsLoggerService_.scheduleAtFixedRate(
new MetricsLogger(this), 0, 1, TimeUnit.MINUTES);
}
@@ -199,7 +191,7 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
apiNamesSet_.add(method.getName());
}
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
.getMeter(CatalogHmsUtils.CATALOGD_CACHE_API_REQUESTS_METRIC)
.mark();
Timer.Context context =
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
@@ -208,13 +200,13 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
Thread.currentThread().getId())
.time();
if (CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getCounter(String.format(CATALOGD_CACHE_API_MISS_METRIC,
.getCounter(String.format(CatalogHmsUtils.CATALOGD_CACHE_API_MISS_METRIC,
method.getName())) == null) {
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.addCounter(String.format(CATALOGD_CACHE_API_MISS_METRIC,
.addCounter(String.format(CatalogHmsUtils.CATALOGD_CACHE_API_MISS_METRIC,
method.getName()));
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.addCounter(String.format(CATALOGD_CACHE_API_HIT_METRIC,
.addCounter(String.format(CatalogHmsUtils.CATALOGD_CACHE_API_HIT_METRIC,
method.getName()));
}
@@ -368,21 +360,22 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
@Override
public TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics() {
long apiRequests = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
.getMeter(CatalogHmsUtils.CATALOGD_CACHE_API_REQUESTS_METRIC)
.getCount();
double cacheHitRatio =
getHitRatio(CATALOGD_CACHE_HIT_METRIC, CATALOGD_CACHE_MISS_METRIC);
getHitRatio(CatalogHmsUtils.CATALOGD_CACHE_HIT_METRIC,
CatalogHmsUtils.CATALOGD_CACHE_MISS_METRIC);
double apiRequestsOneMinute =
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
.getMeter(CatalogHmsUtils.CATALOGD_CACHE_API_REQUESTS_METRIC)
.getOneMinuteRate();
double apiRequestsFiveMinutes =
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
.getMeter(CatalogHmsUtils.CATALOGD_CACHE_API_REQUESTS_METRIC)
.getFiveMinuteRate();
double apiRequestsFifteenMinutes =
CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
.getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
.getMeter(CatalogHmsUtils.CATALOGD_CACHE_API_REQUESTS_METRIC)
.getFifteenMinuteRate();
TCatalogdHmsCacheMetrics catalogdHmsCacheMetrics = new TCatalogdHmsCacheMetrics();
@@ -423,8 +416,9 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
// catalogd server.
if (HmsApiNameEnum.contains(apiName)) {
double specificApiCacheHitRatio =
getHitRatio(String.format(CATALOGD_CACHE_API_HIT_METRIC, apiName),
String.format(CATALOGD_CACHE_API_MISS_METRIC, apiName));
getHitRatio(
String.format(CatalogHmsUtils.CATALOGD_CACHE_API_HIT_METRIC, apiName),
String.format(CatalogHmsUtils.CATALOGD_CACHE_API_MISS_METRIC, apiName));
apiMetrics.setCache_hit_ratio(specificApiCacheHitRatio);
}
double specificApiRequestsOneMinute =

View File

@@ -1834,7 +1834,7 @@ public class CatalogOpExecutor {
db.setLocationUri(params.getLocation());
}
if (params.getManaged_location() != null) {
db.setManagedLocationUri(params.getManaged_location());
MetastoreShim.setManagedLocationUri(db, params.getManaged_location());
}
db.setOwnerName(params.getOwner());
db.setOwnerType(PrincipalType.USER);
@@ -2886,8 +2886,8 @@ public class CatalogOpExecutor {
// used for replication.
if (isTableBeingReplicated(hmsClient, hdfsTable)) {
String dbName = Preconditions.checkNotNull(hdfsTable.getDb()).getName();
hmsClient.truncateTable(dbName, hdfsTable.getName(), null, tblTxn.validWriteIds,
tblTxn.writeId);
MetastoreShim.truncateTable(hmsClient, dbName, hdfsTable.getName(), null,
tblTxn.validWriteIds, tblTxn.writeId);
LOG.trace("Time elapsed to truncate table {} using HMS API: {} msec",
hdfsTable.getFullName(), sw.elapsed(TimeUnit.MILLISECONDS));
} else {
@@ -4461,7 +4461,7 @@ public class CatalogOpExecutor {
writeId, table.getFullName());
// Valid write id means committed write id here.
if (!previousWriteIdList.isWriteIdValid(writeId)) {
part.setWriteId(writeId);
MetastoreShim.setWriteIdToMSPartition(part, writeId);
partsToRefresh.add(part);
writeIdsToRefresh.add(writeId);
}
@@ -6441,6 +6441,8 @@ public class CatalogOpExecutor {
boolean isPartitioned = table.getNumClusteringCols() > 0;
// List of all insert events that we call HMS fireInsertEvent() on.
List<InsertEventRequestData> insertEventReqDatas = new ArrayList<>();
// The partition val list corresponding to insertEventReqDatas for Apache Hive-3
List<List<String>> insertEventPartVals = new ArrayList<>();
// List of all existing partitions that we insert into.
List<HdfsPartition> existingPartitions = new ArrayList<>();
if (isPartitioned) {
@@ -6459,6 +6461,7 @@ public class CatalogOpExecutor {
if (!newFiles.isEmpty() || isInsertOverwrite) {
insertEventReqDatas.add(
makeInsertEventData( table, partVals, newFiles, isInsertOverwrite));
insertEventPartVals.add(partVals);
}
}
@@ -6472,6 +6475,7 @@ public class CatalogOpExecutor {
newFiles.size(), table.getFullName(), part.getPartitionName()));
insertEventReqDatas.add(
makeInsertEventData(table, partVals, newFiles, isInsertOverwrite));
insertEventPartVals.add(partVals);
}
}
@@ -6485,6 +6489,7 @@ public class CatalogOpExecutor {
newFiles.size(), table.getFullName(), part.getKey()));
insertEventReqDatas.add(
makeInsertEventData(table, partVals, newFiles, isInsertOverwrite));
insertEventPartVals.add(partVals);
}
}
@@ -6494,7 +6499,7 @@ public class CatalogOpExecutor {
MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();
TableInsertEventInfo insertEventInfo = new TableInsertEventInfo(
insertEventReqDatas, isTransactional, txnId, writeId);
insertEventReqDatas, insertEventPartVals, isTransactional, txnId, writeId);
List<Long> eventIds = MetastoreShim.fireInsertEvents(metaStoreClient,
insertEventInfo, table.getDb().getName(), table.getName());
if (isTransactional) {
@@ -6535,7 +6540,9 @@ public class CatalogOpExecutor {
boolean isTransactional = AcidUtils
.isTransactionalTable(tbl.getMetaStoreTable().getParameters());
// in case of unpartitioned table, partVals will be empty
if (!partVals.isEmpty()) insertEventRequestData.setPartitionVal(partVals);
if (!partVals.isEmpty()) {
MetastoreShim.setPartitionVal(insertEventRequestData, partVals);
}
FileSystem fs = tbl.getFileSystem();
for (String file : newFiles) {
try {
@@ -6548,7 +6555,7 @@ public class CatalogOpExecutor {
if (isTransactional) {
String acidDirPath = AcidUtils.getFirstLevelAcidDirPath(filePath, fs);
if (acidDirPath != null) {
insertEventRequestData.addToSubDirectoryList(acidDirPath);
MetastoreShim.addToSubDirectoryList(insertEventRequestData, acidDirPath);
}
}
insertEventRequestData.setReplace(isInsertOverwrite);

View File

@@ -78,7 +78,7 @@ public class DescribeResultFactory {
String comment = null;
if(msDb != null) {
location = msDb.getLocationUri();
managedLocation = msDb.getManagedLocationUri();
managedLocation = MetastoreShim.getManagedLocationUri(msDb);
comment = msDb.getDescription();
}

View File

@@ -42,7 +42,6 @@ import org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLat
import org.apache.impala.catalog.events.MetastoreEvents.MetastoreEventFactory;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.NoOpEventProcessor;
import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
import org.apache.impala.catalog.monitor.CatalogMonitor;
@@ -180,9 +179,7 @@ public class JniCatalog {
if (!BackendConfig.INSTANCE.startHmsServer()) {
return NoOpCatalogMetastoreServer.INSTANCE;
}
int portNumber = BackendConfig.INSTANCE.getHMSPort();
Preconditions.checkState(portNumber > 0, "Invalid port number for HMS service.");
return new CatalogMetastoreServer(catalogOpExecutor);
return MetastoreShim.getCatalogMetastoreServer(catalogOpExecutor);
}
/**

View File

@@ -19,9 +19,6 @@ package org.apache.impala.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.errorprone.annotations.Immutable;
import java.io.IOException;
@@ -31,14 +28,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse;
import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoRequest;
import org.apache.hadoop.hive.metastore.api.GetLatestCommittedCompactionInfoResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.CompactionInfoLoader;
import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
@@ -51,6 +44,7 @@ import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.Reference;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.THdfsFileDesc;
import org.apache.impala.thrift.TPartialPartitionInfo;
import org.apache.impala.thrift.TTransactionalType;
@@ -710,7 +704,7 @@ public class AcidUtils {
// if the provided table id does not match with what CatalogService has we return
// -1 indicating that cached table is stale.
if (tableId != CatalogServiceCatalog.TABLE_ID_UNAVAILABLE
&& tbl.getMetaStoreTable().getId() != tableId) {
&& MetastoreShim.getTableId(tbl.getMetaStoreTable()) != tableId) {
return -1;
}
return compare(tbl.getValidWriteIds(), validWriteIdList);
@@ -789,57 +783,9 @@ public class AcidUtils {
CatalogServiceCatalog catalog, HdfsTable hdfsTable) throws CatalogException {
Stopwatch sw = Stopwatch.createStarted();
Preconditions.checkState(hdfsTable.isReadLockedByCurrentThread());
List<HdfsPartition.Builder> partBuilders = new ArrayList<>();
List<HdfsPartition> hdfsPartitions = hdfsTable.getPartitions()
.stream()
.map(p -> (HdfsPartition) p)
.collect(Collectors.toList());
// fetch the latest compaction info from HMS
GetLatestCommittedCompactionInfoRequest request =
new GetLatestCommittedCompactionInfoRequest(
hdfsTable.getDb().getName(), hdfsTable.getName());
if (hdfsTable.isPartitioned()) {
List<String> partNames = hdfsPartitions.stream()
.map(HdfsPartition::getPartitionName)
.collect(Collectors.toList());
request.setPartitionnames(partNames);
}
GetLatestCommittedCompactionInfoResponse response;
try (MetaStoreClientPool.MetaStoreClient client = catalog.getMetaStoreClient()) {
response = CompactionInfoLoader.getLatestCompactionInfo(client, request);
} catch (Exception e) {
throw new CatalogException("Error getting latest compaction info for "
+ hdfsTable.getFullName(), e);
}
Map<String, Long> partNameToCompactionId = new HashMap<>();
if (hdfsTable.isPartitioned()) {
for (CompactionInfoStruct ci : response.getCompactions()) {
partNameToCompactionId.put(
Preconditions.checkNotNull(ci.getPartitionname()), ci.getId());
}
} else {
CompactionInfoStruct ci = Iterables.getOnlyElement(response.getCompactions(), null);
if (ci != null) {
partNameToCompactionId.put(HdfsTable.DEFAULT_PARTITION_NAME, ci.getId());
}
}
for (HdfsPartition partition : hdfsPartitions) {
long latestCompactionId =
partNameToCompactionId.getOrDefault(partition.getPartitionName(), -1L);
if (partition.getLastCompactionId() >= latestCompactionId) {
continue;
}
HdfsPartition.Builder builder = new HdfsPartition.Builder(partition);
LOG.debug(
"Cached compaction id for {} partition {}: {} but the latest compaction id: {}",
hdfsTable.getFullName(), partition.getPartitionName(),
partition.getLastCompactionId(), latestCompactionId);
builder.setLastCompactionId(latestCompactionId);
partBuilders.add(builder);
}
List<HdfsPartition.Builder> partBuilders = MetastoreShim
.getPartitionsForRefreshingFileMetadata(catalog, hdfsTable);
LOG.debug("Checked the latest compaction id for {}. Time taken: {}",
hdfsTable.getFullName(),
PrintUtils.printTimeMs(sw.stop().elapsed(TimeUnit.MILLISECONDS)));

View File

@@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.compat;
package org.apache.impala.util;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -402,11 +402,11 @@ public class HiveMetadataFormatUtils {
constraintsInfo.append(LINE_DELIM).append("# Constraints").append(LINE_DELIM);
if (PrimaryKeyInfo.isPrimaryKeyInfoNotEmpty(pkInfo)) {
if (isPrimaryKeyInfoNotEmpty(pkInfo)) {
constraintsInfo.append(LINE_DELIM).append("# Primary Key").append(LINE_DELIM);
getPrimaryKeyInformation(constraintsInfo, pkInfo);
}
if (ForeignKeyInfo.isForeignKeyInfoNotEmpty(fkInfo)) {
if (isForeignKeyInfoNotEmpty(fkInfo)) {
constraintsInfo.append(LINE_DELIM).append("# Foreign Keys").append(LINE_DELIM);
getForeignKeysInformation(constraintsInfo, fkInfo);
}
@@ -414,6 +414,14 @@ public class HiveMetadataFormatUtils {
return constraintsInfo.toString();
}
private static boolean isPrimaryKeyInfoNotEmpty(PrimaryKeyInfo info) {
return info != null && !info.getColNames().isEmpty();
}
private static boolean isForeignKeyInfoNotEmpty(ForeignKeyInfo info) {
return info != null && !info.getForeignKeys().isEmpty();
}
private static void getPrimaryKeyInformation(StringBuilder constraintsInfo,
PrimaryKeyInfo pkInfo) {
formatOutput("Table:", pkInfo.getDatabaseName() + "." + pkInfo.getTableName(),

View File

@@ -328,6 +328,8 @@ public class MetaStoreUtil {
public static class TableInsertEventInfo {
// list of partition level insert event info
private final List<InsertEventRequestData> insertEventRequestData_;
// The partition list corresponding to insertEventRequestData, used by Apache Hive 3
private final List<List<String>> insertEventPartVals_;
// transaction id in case this represents a transactional table.
private final long txnId_;
// writeId in case this is for a transactional table.
@@ -335,10 +337,12 @@ public class MetaStoreUtil {
// true in case this is for transactional table.
private final boolean isTransactional_;
public TableInsertEventInfo(
List<InsertEventRequestData> insertEventInfos_, boolean isTransactional,
long txnId, long writeId) {
public TableInsertEventInfo(List<InsertEventRequestData> insertEventInfos_,
List<List<String>> insertEventPartVals_, boolean isTransactional, long txnId,
long writeId) {
Preconditions.checkState(insertEventInfos_.size() == insertEventPartVals_.size());
this.insertEventRequestData_ = insertEventInfos_;
this.insertEventPartVals_ = insertEventPartVals_;
this.txnId_ = txnId;
this.writeId_ = writeId;
this.isTransactional_ = isTransactional;
@@ -352,6 +356,10 @@ public class MetaStoreUtil {
return insertEventRequestData_;
}
public List<List<String>> getInsertEventPartVals() {
return insertEventPartVals_;
}
public long getTxnId() {
return txnId_;
}

View File

@@ -826,7 +826,7 @@ public class PartialCatalogInfoWriteIdTest {
*/
private long getTableId(String db, String tbl) throws TException {
try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
return client.getHiveClient().getTable(db, tbl).getId();
return MetastoreShim.getTableId(client.getHiveClient().getTable(db, tbl));
}
}

View File

@@ -59,7 +59,6 @@ import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -1169,16 +1168,18 @@ public class MetastoreEventsProcessorTest {
totalNumberOfFilesToAdd, isOverwrite);
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
List<InsertEventRequestData> partitionInsertEventInfos = new ArrayList<>();
List<List<String>> partitionInsertEventVals = new ArrayList<>();
InsertEventRequestData insertEventRequestData = new InsertEventRequestData();
insertEventRequestData.setFilesAdded(newFiles);
insertEventRequestData.setReplace(isOverwrite);
if (partition != null) {
insertEventRequestData.setPartitionVal(partition.getValues());
MetastoreShim.setPartitionVal(insertEventRequestData, partition.getValues());
}
partitionInsertEventInfos
.add(insertEventRequestData);
partitionInsertEventInfos.add(insertEventRequestData);
partitionInsertEventVals.add(partition != null ? partition.getValues() : null);
MetastoreShim.fireInsertEventHelper(metaStoreClient.getHiveClient(),
partitionInsertEventInfos, msTbl.getDbName(), msTbl.getTableName());
partitionInsertEventInfos, partitionInsertEventVals, msTbl.getDbName(),
msTbl.getTableName());
}
}
@@ -1191,15 +1192,18 @@ public class MetastoreEventsProcessorTest {
List<String> newFiles = addFilesToDirectory(deltaPath, "testFile.",
totalNumberOfFilesToAdd, false);
List<InsertEventRequestData> insertEventReqDatas = new ArrayList<>();
List<List<String>> insertEventVals = new ArrayList<>();
InsertEventRequestData insertEventRequestData = new InsertEventRequestData();
if (partition != null) {
insertEventRequestData.setPartitionVal(partition.getValues());
MetastoreShim.setPartitionVal(insertEventRequestData, partition.getValues());
}
insertEventRequestData.setFilesAdded(newFiles);
insertEventRequestData.setReplace(false);
insertEventReqDatas.add(insertEventRequestData);
insertEventVals.add(partition != null ? partition.getValues() : null);
MetaStoreUtil.TableInsertEventInfo insertEventInfo =
new MetaStoreUtil.TableInsertEventInfo(insertEventReqDatas, true, txnId, writeId);
new MetaStoreUtil.TableInsertEventInfo(insertEventReqDatas, insertEventVals,
true, txnId, writeId);
MetastoreShim.fireInsertEvents(catalog_.getMetaStoreClient(), insertEventInfo,
msTbl.getDbName(), msTbl.getTableName());
}
@@ -1513,12 +1517,13 @@ public class MetastoreEventsProcessorTest {
// created table
assertEquals(3, events.size());
Table existingTable = catalog_.getTable(TEST_DB_NAME, testTblName);
long id = existingTable.getMetaStoreTable().getId();
long id = MetastoreShim.getTableId(existingTable.getMetaStoreTable());
assertEquals("CREATE_TABLE", events.get(0).getEventType());
eventsProcessor_.processEvents(Lists.newArrayList(events.get(0)));
// after processing the create_table the original table should still remain the same
assertEquals(id, catalog_.getTable(TEST_DB_NAME,
testTblName).getMetaStoreTable().getId());
long testId = MetastoreShim.getTableId(catalog_.getTable(TEST_DB_NAME,
testTblName).getMetaStoreTable());
assertEquals(id, testId);
//second event should be drop_table. This event should also be skipped since
// catalog state is more recent than the event
assertEquals("DROP_TABLE", events.get(1).getEventType());
@@ -1540,8 +1545,9 @@ public class MetastoreEventsProcessorTest {
catalog_.getTable(TEST_DB_NAME,
testTblName) instanceof IncompleteTable);
//finally make sure the table is still the same
assertEquals(id, catalog_.getTable(TEST_DB_NAME,
testTblName).getMetaStoreTable().getId());
testId = MetastoreShim.getTableId(catalog_.getTable(TEST_DB_NAME,
testTblName).getMetaStoreTable());
assertEquals(id, testId);
}
/**
@@ -2415,13 +2421,10 @@ public class MetastoreEventsProcessorTest {
// test insert event batching
org.apache.hadoop.hive.metastore.api.Table msTbl;
List<Partition> partitions;
PartitionsRequest req = new PartitionsRequest();
req.setDbName(TEST_DB_NAME);
req.setTblName(testTblName);
try (MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient()) {
msTbl = metaStoreClient.getHiveClient().getTable(TEST_DB_NAME, testTblName);
partitions = metaStoreClient.getHiveClient().getPartitionsRequest(req)
.getPartitions();
partitions = MetastoreShim
.getPartitions(metaStoreClient.getHiveClient(), TEST_DB_NAME, testTblName);
}
assertNotNull(msTbl);
assertNotNull(partitions);
@@ -3859,7 +3862,7 @@ public class MetastoreEventsProcessorTest {
Partition partition = metaStoreClient.getHiveClient().getPartition(db,
tblName, partVal);
partition.getParameters().put(key, val);
partition.setWriteId(writeId);
MetastoreShim.setWriteIdToMSPartition(partition, writeId);
partitions.add(partition);
}
metaStoreClient.getHiveClient().alter_partitions(db, tblName, partitions);

View File

@@ -27,7 +27,6 @@ import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.TableLoadingMgr;
import org.apache.impala.catalog.events.MetastoreEvents.EventFactoryForSyncToLatestEvent;
import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
import org.apache.impala.catalog.events.NoOpEventProcessor;
import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
import org.apache.impala.compat.MetastoreShim;

View File

@@ -34,6 +34,7 @@ under the License.
<hive.version>${env.IMPALA_HIVE_VERSION}</hive.version>
<hive.storage.api.version>${env.IMPALA_HIVE_STORAGE_API_VERSION}</hive.storage.api.version>
<hive.major.version>${env.IMPALA_HIVE_MAJOR_VERSION}</hive.major.version>
<hive.dist.type>${env.IMPALA_HIVE_DIST_TYPE}</hive.dist.type>
<hudi.version>${env.IMPALA_HUDI_VERSION}</hudi.version>
<ranger.version>${env.IMPALA_RANGER_VERSION}</ranger.version>
<postgres.jdbc.version>${env.IMPALA_POSTGRES_JDBC_DRIVER_VERSION}</postgres.jdbc.version>

74
testdata/bin/patch_hive.sh vendored Executable file
View File

@@ -0,0 +1,74 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# This script is used to repair service startup and task running problems that occur when
# Apache Hive 3 is integrated.
# Repair method:
# - Replace jar
# - Backport the patch (Hive 3.x has not been released for a long time, apply the patch
# as a transitional solution until the new version is released).
set -euo pipefail
. $IMPALA_HOME/bin/report_build_error.sh
setup_report_build_error
if [[ "${USE_APACHE_HIVE}" != true ]]; then
exit 0
fi
# Cache applied patches
PATCHED_CACHE_FILE="$HIVE_SRC_DIR/.patched"
if [ ! -f "$PATCHED_CACHE_FILE" ]; then touch "$PATCHED_CACHE_FILE"; fi
# Apache Hive patch dir
HIVE_PARCH_DIR="${IMPALA_HOME}/testdata/cluster/hive"
# Apply the patch and save the patch name to .patched
function apply_patch {
p="$1"
status=1
while IFS= read -r line
do
if [ "$line" == "$p" ]; then
status=0
break
fi
done < $PATCHED_CACHE_FILE
if [ $status = "1" ] ;then
echo "Apply patch: $p"
patch -p1 < ${HIVE_PARCH_DIR}/$p
echo $p >> $PATCHED_CACHE_FILE
fi
}
# 1. Fix HIVE-22915
echo "Fix HIVE-22915"
rm $HIVE_HOME/lib/guava-*jar
cp $HADOOP_HOME/share/hadoop/hdfs/lib/guava-*.jar $HIVE_HOME/lib/
# 2. Apply patches
pushd "$HIVE_SRC_DIR"
for file in `ls ${HIVE_PARCH_DIR}/patch*.diff | sort`
do
p=$(basename $file)
apply_patch $p
done
# 3. Repackage the hive submodules affected by the patch
popd

6
testdata/cluster/hive/README vendored Normal file
View File

@@ -0,0 +1,6 @@
The patches in this directory are for Apache Hive and affect system integration and
testing.
patch0-HIVE-21586.diff:
Contains only patches for `hive_metastore.thrift` is used to solve the problem that the
generated cpp file cannot be compiled.

View File

@@ -0,0 +1,161 @@
diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift
index ad1dc1f769..c92db542bb 100644
--- a/standalone-metastore/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift
@@ -411,65 +411,13 @@ struct StorageDescriptor {
12: optional bool storedAsSubDirectories // stored as subdirectories or not
}
-// table information
-struct Table {
- 1: string tableName, // name of the table
- 2: string dbName, // database name ('default')
- 3: string owner, // owner of this table
- 4: i32 createTime, // creation time of the table
- 5: i32 lastAccessTime, // last access time (usually this will be filled from HDFS and shouldn't be relied on)
- 6: i32 retention, // retention time
- 7: StorageDescriptor sd, // storage descriptor of the table
- 8: list<FieldSchema> partitionKeys, // partition keys of the table. only primitive types are supported
- 9: map<string, string> parameters, // to store comments or any other user level parameters
- 10: string viewOriginalText, // original view text, null for non-view
- 11: string viewExpandedText, // expanded view text, null for non-view
- 12: string tableType, // table type enum, e.g. EXTERNAL_TABLE
- 13: optional PrincipalPrivilegeSet privileges,
- 14: optional bool temporary=false,
- 15: optional bool rewriteEnabled, // rewrite enabled or not
- 16: optional CreationMetadata creationMetadata, // only for MVs, it stores table names used and txn list at MV creation
- 17: optional string catName, // Name of the catalog the table is in
- 18: optional PrincipalType ownerType = PrincipalType.USER // owner type of this table (default to USER for backward compatibility)
-}
-
-struct Partition {
- 1: list<string> values // string value is converted to appropriate partition key type
- 2: string dbName,
- 3: string tableName,
- 4: i32 createTime,
- 5: i32 lastAccessTime,
- 6: StorageDescriptor sd,
- 7: map<string, string> parameters,
- 8: optional PrincipalPrivilegeSet privileges,
- 9: optional string catName
-}
-
-struct PartitionWithoutSD {
- 1: list<string> values // string value is converted to appropriate partition key type
- 2: i32 createTime,
- 3: i32 lastAccessTime,
- 4: string relativePath,
- 5: map<string, string> parameters,
- 6: optional PrincipalPrivilegeSet privileges
-}
-
-struct PartitionSpecWithSharedSD {
- 1: list<PartitionWithoutSD> partitions,
- 2: StorageDescriptor sd,
-}
-
-struct PartitionListComposingSpec {
- 1: list<Partition> partitions
-}
-
-struct PartitionSpec {
- 1: string dbName,
- 2: string tableName,
- 3: string rootPath,
- 4: optional PartitionSpecWithSharedSD sharedSDPartitionSpec,
- 5: optional PartitionListComposingSpec partitionList,
- 6: optional string catName
+struct CreationMetadata {
+ 1: required string catName
+ 2: required string dbName,
+ 3: required string tblName,
+ 4: required set<string> tablesUsed,
+ 5: optional string validTxnList,
+ 6: optional i64 materializationTime
}
// column statistics
@@ -567,6 +515,67 @@ struct ColumnStatistics {
2: required list<ColumnStatisticsObj> statsObj;
}
+// table information
+struct Table {
+ 1: string tableName, // name of the table
+ 2: string dbName, // database name ('default')
+ 3: string owner, // owner of this table
+ 4: i32 createTime, // creation time of the table
+ 5: i32 lastAccessTime, // last access time (usually this will be filled from HDFS and shouldn't be relied on)
+ 6: i32 retention, // retention time
+ 7: StorageDescriptor sd, // storage descriptor of the table
+ 8: list<FieldSchema> partitionKeys, // partition keys of the table. only primitive types are supported
+ 9: map<string, string> parameters, // to store comments or any other user level parameters
+ 10: string viewOriginalText, // original view text, null for non-view
+ 11: string viewExpandedText, // expanded view text, null for non-view
+ 12: string tableType, // table type enum, e.g. EXTERNAL_TABLE
+ 13: optional PrincipalPrivilegeSet privileges,
+ 14: optional bool temporary=false,
+ 15: optional bool rewriteEnabled, // rewrite enabled or not
+ 16: optional CreationMetadata creationMetadata, // only for MVs, it stores table names used and txn list at MV creation
+ 17: optional string catName, // Name of the catalog the table is in
+ 18: optional PrincipalType ownerType = PrincipalType.USER // owner type of this table (default to USER for backward compatibility)
+}
+
+struct Partition {
+ 1: list<string> values // string value is converted to appropriate partition key type
+ 2: string dbName,
+ 3: string tableName,
+ 4: i32 createTime,
+ 5: i32 lastAccessTime,
+ 6: StorageDescriptor sd,
+ 7: map<string, string> parameters,
+ 8: optional PrincipalPrivilegeSet privileges,
+ 9: optional string catName
+}
+
+struct PartitionWithoutSD {
+ 1: list<string> values // string value is converted to appropriate partition key type
+ 2: i32 createTime,
+ 3: i32 lastAccessTime,
+ 4: string relativePath,
+ 5: map<string, string> parameters,
+ 6: optional PrincipalPrivilegeSet privileges
+}
+
+struct PartitionSpecWithSharedSD {
+ 1: list<PartitionWithoutSD> partitions,
+ 2: StorageDescriptor sd,
+}
+
+struct PartitionListComposingSpec {
+ 1: list<Partition> partitions
+}
+
+struct PartitionSpec {
+ 1: string dbName,
+ 2: string tableName,
+ 3: string rootPath,
+ 4: optional PartitionSpecWithSharedSD sharedSDPartitionSpec,
+ 5: optional PartitionListComposingSpec partitionList,
+ 6: optional string catName
+}
+
struct AggrStats {
1: required list<ColumnStatisticsObj> colStats,
2: required i64 partsFound // number of partitions for which stats were found
@@ -1055,15 +1064,6 @@ struct BasicTxnInfo {
6: optional string partitionname
}
-struct CreationMetadata {
- 1: required string catName
- 2: required string dbName,
- 3: required string tblName,
- 4: required set<string> tablesUsed,
- 5: optional string validTxnList,
- 6: optional i64 materializationTime
-}
-
struct NotificationEventRequest {
1: required i64 lastEvent,
2: optional i32 maxEvents,