mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-11153: Make lock wait time configurable for the users
Currently Impala is using private static final values for lock retry/wait: // Number of retries to acquire an HMS ACID lock. private static final int LOCK_RETRIES = 10; // Time interval between retries of acquiring an HMS ACID lock private static final int LOCK_RETRY_WAIT_SECONDS = 3; This patch changes the logic for waiting on locks. It does an exponential backoff starting from 50 ms up to 30 seconds, and the users can configure a maximum total wait time for the locks. This total maximum wait time is 5 minutes (the earlier 30 seconds was too short in real use cases). Testing: * added e2e tests Change-Id: I055b76138dd30b2c40eedb48905cb3bade1438fc Reviewed-on: http://gerrit.cloudera.org:8080/18289 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
503e58174e
commit
a0922079f5
@@ -156,8 +156,9 @@ Status CatalogOpExecutor::ExecComputeStats(
|
||||
catalog_op_req.__set_sync_ddl(compute_stats_request.sync_ddl);
|
||||
TDdlExecRequest& update_stats_req = catalog_op_req.ddl_params;
|
||||
update_stats_req.__set_ddl_type(TDdlType::ALTER_TABLE);
|
||||
update_stats_req.__set_sync_ddl(compute_stats_request.sync_ddl);
|
||||
update_stats_req.__set_debug_action(compute_stats_request.ddl_params.debug_action);
|
||||
update_stats_req.query_options.__set_sync_ddl(compute_stats_request.sync_ddl);
|
||||
update_stats_req.query_options.__set_debug_action(
|
||||
compute_stats_request.ddl_params.query_options.debug_action);
|
||||
update_stats_req.__set_header(TCatalogServiceRequestHeader());
|
||||
update_stats_req.header.__set_want_minimal_response(FLAGS_use_local_catalog);
|
||||
|
||||
|
||||
@@ -1199,6 +1199,17 @@ Status impala::SetQueryOption(const string& key, const string& value,
|
||||
"Only integer value 0 and above is allowed.", value));
|
||||
}
|
||||
query_options->__set_runtime_in_list_filter_entry_limit(limit);
|
||||
}
|
||||
case TImpalaQueryOptions::LOCK_MAX_WAIT_TIME_S: {
|
||||
StringParser::ParseResult result;
|
||||
const int32_t lock_max_wait =
|
||||
StringParser::StringToInt<int32_t>(value.c_str(), value.length(), &result);
|
||||
if (value == nullptr || result != StringParser::PARSE_SUCCESS ||
|
||||
lock_max_wait < 0) {
|
||||
return Status(Substitute("Invalid value for LOCK_MAX_WAIT_TIME_S: '$0'. "
|
||||
"Only integer value 0 and above is allowed.", value));
|
||||
}
|
||||
query_options->__set_lock_max_wait_time_s(lock_max_wait);
|
||||
break;
|
||||
}
|
||||
case TImpalaQueryOptions::ENABLE_REPLAN: {
|
||||
|
||||
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
|
||||
// time we add or remove a query option to/from the enum TImpalaQueryOptions.
|
||||
#define QUERY_OPTS_TABLE\
|
||||
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
|
||||
TImpalaQueryOptions::TEST_REPLAN+ 1);\
|
||||
TImpalaQueryOptions::LOCK_MAX_WAIT_TIME_S + 1);\
|
||||
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
|
||||
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
|
||||
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
|
||||
@@ -281,6 +281,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
|
||||
TQueryOptionLevel::ADVANCED)\
|
||||
QUERY_OPT_FN(test_replan, TEST_REPLAN,\
|
||||
TQueryOptionLevel::ADVANCED)\
|
||||
QUERY_OPT_FN(lock_max_wait_time_s, LOCK_MAX_WAIT_TIME_S, TQueryOptionLevel::REGULAR)\
|
||||
;
|
||||
|
||||
/// Enforce practical limits on some query options to avoid undesired query state.
|
||||
|
||||
@@ -86,6 +86,18 @@ struct TCatalogUpdateResult {
|
||||
6: optional list<CatalogObjects.TCatalogObject> removed_catalog_objects
|
||||
}
|
||||
|
||||
// Subset of query options passed to DDL operations
|
||||
struct TDdlQueryOptions {
|
||||
// True if SYNC_DDL is set in query options
|
||||
1: required bool sync_ddl
|
||||
|
||||
// Passes the debug actions to catalogd if the query option is set.
|
||||
2: optional string debug_action
|
||||
|
||||
// Maximum wait time on an HMS ACID lock in seconds.
|
||||
3: optional i32 lock_max_wait_time_s
|
||||
}
|
||||
|
||||
// Request for executing a DDL operation (CREATE, ALTER, DROP).
|
||||
struct TDdlExecRequest {
|
||||
1: required CatalogServiceVersion protocol_version = CatalogServiceVersion.V1
|
||||
@@ -149,20 +161,17 @@ struct TDdlExecRequest {
|
||||
// Parameters for GRANT/REVOKE privilege
|
||||
21: optional JniCatalog.TGrantRevokePrivParams grant_revoke_priv_params
|
||||
|
||||
// True if SYNC_DDL is set in query options
|
||||
22: required bool sync_ddl
|
||||
|
||||
// Parameters for COMMENT ON
|
||||
23: optional JniCatalog.TCommentOnParams comment_on_params
|
||||
22: optional JniCatalog.TCommentOnParams comment_on_params
|
||||
|
||||
// Parameters for ALTER DATABASE
|
||||
24: optional JniCatalog.TAlterDbParams alter_db_params
|
||||
23: optional JniCatalog.TAlterDbParams alter_db_params
|
||||
|
||||
// Parameters for replaying an exported testcase.
|
||||
25: optional JniCatalog.TCopyTestCaseReq copy_test_case_params
|
||||
24: optional JniCatalog.TCopyTestCaseReq copy_test_case_params
|
||||
|
||||
// Passes the debug actions to catalogd if the query option is set.
|
||||
26: optional string debug_action
|
||||
// Query options passed to DDL operations.
|
||||
25: required TDdlQueryOptions query_options
|
||||
}
|
||||
|
||||
// Response from executing a TDdlExecRequest
|
||||
|
||||
@@ -730,6 +730,9 @@ enum TImpalaQueryOptions {
|
||||
|
||||
// If true, test replan by imposing artificial two executor groups in FE.
|
||||
TEST_REPLAN = 144;
|
||||
|
||||
// Maximum wait time on HMS ACID lock in seconds.
|
||||
LOCK_MAX_WAIT_TIME_S = 145
|
||||
}
|
||||
|
||||
// The summary of a DML statement.
|
||||
|
||||
@@ -588,6 +588,9 @@ struct TQueryOptions {
|
||||
// 1. regular: <num_nodes> nodes with 64MB of per-host estimated memory threshold
|
||||
// 2. large: <num_nodes> nodes with 8PB of per-host estimated memory threshold
|
||||
145: optional bool test_replan = false;
|
||||
|
||||
// See comment in ImpalaService.thrift
|
||||
146: optional i32 lock_max_wait_time_s = 300
|
||||
}
|
||||
|
||||
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external
|
||||
|
||||
@@ -710,11 +710,14 @@ public abstract class Catalog implements AutoCloseable {
|
||||
* 'releaseTableLock()'.
|
||||
* @param dbName Name of the DB where the particular table is.
|
||||
* @param tableName Name of the table where the lock is acquired.
|
||||
* @param lockMaxWaitTime Maximum wait time on the ACID lock.
|
||||
* @throws TransactionException
|
||||
*/
|
||||
public long lockTableStandalone(String dbName, String tableName, HeartbeatContext ctx)
|
||||
public long lockTableStandalone(String dbName, String tableName, HeartbeatContext ctx,
|
||||
int lockMaxWaitTime)
|
||||
throws TransactionException {
|
||||
return lockTableInternal(dbName, tableName, 0L, DataOperationType.NO_TXN, ctx);
|
||||
return lockTableInternal(dbName, tableName, 0L, DataOperationType.NO_TXN, ctx,
|
||||
lockMaxWaitTime);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -724,13 +727,16 @@ public abstract class Catalog implements AutoCloseable {
|
||||
* @param dbName Name of the DB where the particular table is.
|
||||
* @param tableName Name of the table where the lock is acquired.
|
||||
* @param transaction the transaction that needs to lock the table.
|
||||
* @param lockMaxWaitTime Maximum wait time on the ACID lock.
|
||||
* @throws TransactionException
|
||||
*/
|
||||
public void lockTableInTransaction(String dbName, String tableName,
|
||||
Transaction transaction, DataOperationType opType, HeartbeatContext ctx)
|
||||
Transaction transaction, DataOperationType opType, HeartbeatContext ctx,
|
||||
int lockMaxWaitTime)
|
||||
throws TransactionException {
|
||||
Preconditions.checkState(transaction.getId() > 0);
|
||||
lockTableInternal(dbName, tableName, transaction.getId(), opType, ctx);
|
||||
lockTableInternal(dbName, tableName, transaction.getId(), opType, ctx,
|
||||
lockMaxWaitTime);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -739,10 +745,12 @@ public abstract class Catalog implements AutoCloseable {
|
||||
* @param dbName Name of the DB where the particular table is.
|
||||
* @param tableName Name of the table where the lock is acquired.
|
||||
* @param txnId id of the transaction, 0 for standalone locks.
|
||||
* @param lockMaxWaitTime Maximum wait time on the ACID lock.
|
||||
* @throws TransactionException
|
||||
*/
|
||||
private long lockTableInternal(String dbName, String tableName, long txnId,
|
||||
DataOperationType opType, HeartbeatContext ctx) throws TransactionException {
|
||||
DataOperationType opType, HeartbeatContext ctx, int lockMaxWaitTime)
|
||||
throws TransactionException {
|
||||
Preconditions.checkState(txnId >= 0);
|
||||
LockComponent lockComponent = new LockComponent();
|
||||
lockComponent.setDbname(dbName);
|
||||
@@ -753,7 +761,8 @@ public abstract class Catalog implements AutoCloseable {
|
||||
List<LockComponent> lockComponents = Arrays.asList(lockComponent);
|
||||
long lockId = -1L;
|
||||
try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
|
||||
lockId = MetastoreShim.acquireLock(client.getHiveClient(), txnId, lockComponents);
|
||||
lockId = MetastoreShim.acquireLock(client.getHiveClient(), txnId, lockComponents,
|
||||
lockMaxWaitTime);
|
||||
if (txnId == 0L) transactionKeepalive_.addLock(lockId, ctx);
|
||||
}
|
||||
return lockId;
|
||||
|
||||
@@ -103,11 +103,8 @@ public class Hive3MetastoreShimBase {
|
||||
protected static final long MAJOR_VERSION = 3;
|
||||
protected static boolean capabilitiestSet_ = false;
|
||||
|
||||
// Number of retries to acquire an HMS ACID lock.
|
||||
private static final int LOCK_RETRIES = 10;
|
||||
|
||||
// Time interval between retries of acquiring an HMS ACID lock
|
||||
private static final int LOCK_RETRY_WAIT_SECONDS = 3;
|
||||
// Max sleep interval during acquiring an ACID lock.
|
||||
private static final long MAX_SLEEP_INTERVAL_MS = 30000;
|
||||
|
||||
protected final static String HMS_RPC_ERROR_FORMAT_STR =
|
||||
"Error making '%s' RPC to Hive Metastore: ";
|
||||
@@ -618,15 +615,16 @@ public class Hive3MetastoreShimBase {
|
||||
* Creates a lock for the given lock components. Returns the acquired lock, this might
|
||||
* involve some waiting.
|
||||
*
|
||||
* @param client is the HMS client to be used.
|
||||
* @param txnId The transaction ID associated with the lock. Zero if the lock
|
||||
* doesn't belong to a transaction.
|
||||
* @param lockComponents the lock components to include in this lock.
|
||||
* @param client is the HMS client to be used.
|
||||
* @param txnId The transaction ID associated with the lock. Zero if the
|
||||
* lock doesn't belong to a transaction.
|
||||
* @param lockComponents the lock components to include in this lock.
|
||||
* @param maxWaitTimeInSeconds Maximum wait time to acquire the lock.
|
||||
* @return the lock id
|
||||
* @throws TransactionException in case of failure
|
||||
*/
|
||||
public static long acquireLock(IMetaStoreClient client, long txnId,
|
||||
List<LockComponent> lockComponents)
|
||||
List<LockComponent> lockComponents, int maxWaitTimeInSeconds)
|
||||
throws TransactionException {
|
||||
LockRequestBuilder lockRequestBuilder = new LockRequestBuilder();
|
||||
lockRequestBuilder.setUser(TRANSACTION_USER_ID);
|
||||
@@ -638,22 +636,28 @@ public class Hive3MetastoreShimBase {
|
||||
}
|
||||
LockRequest lockRequest = lockRequestBuilder.build();
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
long timeoutTime = startTime + maxWaitTimeInSeconds * 1000;
|
||||
long sleepIntervalMs = 100;
|
||||
LockResponse lockResponse = client.lock(lockRequest);
|
||||
long lockId = lockResponse.getLockid();
|
||||
int retries = 0;
|
||||
while (lockResponse.getState() == LockState.WAITING && retries < LOCK_RETRIES) {
|
||||
while (lockResponse.getState() == LockState.WAITING &&
|
||||
System.currentTimeMillis() < timeoutTime) {
|
||||
try {
|
||||
//TODO: add profile counter for lock waits.
|
||||
LOG.info("Waiting " + String.valueOf(LOCK_RETRY_WAIT_SECONDS) +
|
||||
" seconds for lock " + String.valueOf(lockId) + " of transaction " +
|
||||
// Sleep 'sleepIntervalMs', or the amount of time left until 'timeoutTime'.
|
||||
long sleepMs = Math.min(sleepIntervalMs,
|
||||
Math.abs(timeoutTime - System.currentTimeMillis()));
|
||||
LOG.debug("Waiting " + String.valueOf(sleepMs) +
|
||||
" milliseconds for lock " + String.valueOf(lockId) + " of transaction " +
|
||||
Long.toString(txnId));
|
||||
Thread.sleep(LOCK_RETRY_WAIT_SECONDS * 1000);
|
||||
++retries;
|
||||
Thread.sleep(sleepMs);
|
||||
sleepIntervalMs = Math.min(MAX_SLEEP_INTERVAL_MS,
|
||||
sleepIntervalMs * 2);
|
||||
lockResponse = client.checkLock(lockId);
|
||||
} catch (InterruptedException e) {
|
||||
// Since wait time and number of retries is configurable it wouldn't add
|
||||
// much value to make acquireLock() interruptible so we just swallow the
|
||||
// exception here.
|
||||
// Since wait time is configurable it wouldn't add much value to make
|
||||
// acquireLock() interruptible so we just swallow the exception here.
|
||||
}
|
||||
}
|
||||
if (lockResponse.getState() == LockState.ACQUIRED) {
|
||||
@@ -781,5 +785,4 @@ public class Hive3MetastoreShimBase {
|
||||
return wh.getDefaultTablePath(db, tbl.getTableName().toLowerCase(), isExternal)
|
||||
.toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -392,7 +392,7 @@ public class CatalogOpExecutor {
|
||||
Optional<TTableName> tTableName = Optional.empty();
|
||||
TDdlType ddl_type = ddlRequest.ddl_type;
|
||||
try {
|
||||
boolean syncDdl = ddlRequest.isSync_ddl();
|
||||
boolean syncDdl = ddlRequest.getQuery_options().isSync_ddl();
|
||||
switch (ddl_type) {
|
||||
case ALTER_DATABASE:
|
||||
TAlterDbParams alter_db_params = ddlRequest.getAlter_db_params();
|
||||
@@ -404,8 +404,8 @@ public class CatalogOpExecutor {
|
||||
TAlterTableParams alter_table_params = ddlRequest.getAlter_table_params();
|
||||
tTableName = Optional.of(alter_table_params.getTable_name());
|
||||
catalogOpMetric_.increment(ddl_type, tTableName);
|
||||
alterTable(alter_table_params, ddlRequest.getDebug_action(), wantMinimalResult,
|
||||
response);
|
||||
alterTable(alter_table_params, ddlRequest.getQuery_options().getDebug_action(),
|
||||
wantMinimalResult, response);
|
||||
break;
|
||||
case ALTER_VIEW:
|
||||
TCreateOrAlterViewParams alter_view_params = ddlRequest.getAlter_view_params();
|
||||
@@ -480,13 +480,15 @@ public class CatalogOpExecutor {
|
||||
catalogOpMetric_.increment(ddl_type, tTableName);
|
||||
// Dropped tables and views are already returned as minimal results, so don't
|
||||
// need to pass down wantMinimalResult here.
|
||||
dropTableOrView(drop_table_or_view_params, response);
|
||||
dropTableOrView(drop_table_or_view_params, response,
|
||||
ddlRequest.getQuery_options().getLock_max_wait_time_s());
|
||||
break;
|
||||
case TRUNCATE_TABLE:
|
||||
TTruncateParams truncate_params = ddlRequest.getTruncate_params();
|
||||
tTableName = Optional.of(truncate_params.getTable_name());
|
||||
catalogOpMetric_.increment(ddl_type, tTableName);
|
||||
truncateTable(truncate_params, wantMinimalResult, response);
|
||||
truncateTable(truncate_params, wantMinimalResult, response,
|
||||
ddlRequest.getQuery_options().getLock_max_wait_time_s());
|
||||
break;
|
||||
case DROP_FUNCTION:
|
||||
catalogOpMetric_.increment(ddl_type, Optional.empty());
|
||||
@@ -2563,8 +2565,8 @@ public class CatalogOpExecutor {
|
||||
* In case of transactional tables acquires an exclusive HMS table lock before
|
||||
* executing the drop operation.
|
||||
*/
|
||||
private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp)
|
||||
throws ImpalaException {
|
||||
private void dropTableOrView(TDropTableOrViewParams params, TDdlExecResponse resp,
|
||||
int lockMaxWaitTime) throws ImpalaException {
|
||||
TableName tableName = TableName.fromThrift(params.getTable_name());
|
||||
Preconditions.checkState(tableName != null && tableName.isFullyQualified());
|
||||
Preconditions.checkState(!catalog_.isBlacklistedTable(tableName) || params.if_exists,
|
||||
@@ -2604,7 +2606,8 @@ public class CatalogOpExecutor {
|
||||
HeartbeatContext ctx = new HeartbeatContext(
|
||||
String.format("Drop table/view %s.%s", tableName.getDb(), tableName.getTbl()),
|
||||
System.nanoTime());
|
||||
lockId = catalog_.lockTableStandalone(tableName.getDb(), tableName.getTbl(), ctx);
|
||||
lockId = catalog_.lockTableStandalone(tableName.getDb(), tableName.getTbl(), ctx,
|
||||
lockMaxWaitTime);
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -2790,7 +2793,8 @@ public class CatalogOpExecutor {
|
||||
* TODO truncate specified partitions.
|
||||
*/
|
||||
private void truncateTable(TTruncateParams params, boolean wantMinimalResult,
|
||||
TDdlExecResponse resp) throws ImpalaException {
|
||||
TDdlExecResponse resp, int lockMaxWaitTime)
|
||||
throws ImpalaException {
|
||||
TTableName tblName = params.getTable_name();
|
||||
Table table = null;
|
||||
try {
|
||||
@@ -2819,7 +2823,7 @@ public class CatalogOpExecutor {
|
||||
long newCatalogVersion = 0;
|
||||
try {
|
||||
if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
|
||||
newCatalogVersion = truncateTransactionalTable(params, table);
|
||||
newCatalogVersion = truncateTransactionalTable(params, table, lockMaxWaitTime);
|
||||
} else if (table instanceof FeIcebergTable) {
|
||||
newCatalogVersion = truncateIcebergTable(params, table);
|
||||
} else {
|
||||
@@ -2851,8 +2855,8 @@ public class CatalogOpExecutor {
|
||||
* After that empty directory creation it removes stats-related parameters of
|
||||
* the table and its partitions.
|
||||
*/
|
||||
private long truncateTransactionalTable(TTruncateParams params, Table table)
|
||||
throws ImpalaException {
|
||||
private long truncateTransactionalTable(TTruncateParams params, Table table,
|
||||
int lockMaxWaitTime) throws ImpalaException {
|
||||
Preconditions.checkState(table.isWriteLockedByCurrentThread());
|
||||
Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
|
||||
catalog_.getLock().writeLock().unlock();
|
||||
@@ -2871,7 +2875,7 @@ public class CatalogOpExecutor {
|
||||
table.releaseWriteLock();
|
||||
//TODO: if possible, set DataOperationType to something better than NO_TXN.
|
||||
catalog_.lockTableInTransaction(tblName.getDb(), tblName.getTbl(), txn,
|
||||
DataOperationType.NO_TXN, ctx);
|
||||
DataOperationType.NO_TXN, ctx, lockMaxWaitTime);
|
||||
tryWriteLock(table, "truncating");
|
||||
LOG.trace("Time elapsed after taking write lock on table {}: {} msec",
|
||||
table.getFullName(), sw.elapsed(TimeUnit.MILLISECONDS));
|
||||
|
||||
@@ -145,6 +145,7 @@ import org.apache.impala.thrift.TCommentOnParams;
|
||||
import org.apache.impala.thrift.TCreateDropRoleParams;
|
||||
import org.apache.impala.thrift.TDataSink;
|
||||
import org.apache.impala.thrift.TDdlExecRequest;
|
||||
import org.apache.impala.thrift.TDdlQueryOptions;
|
||||
import org.apache.impala.thrift.TDdlType;
|
||||
import org.apache.impala.thrift.TDescribeHistoryParams;
|
||||
import org.apache.impala.thrift.TDescribeOutputStyle;
|
||||
@@ -828,12 +829,15 @@ public class Frontend {
|
||||
header.setWant_minimal_response(
|
||||
BackendConfig.INSTANCE.getBackendCfg().use_local_catalog);
|
||||
ddl.getDdl_params().setHeader(header);
|
||||
ddl.getDdl_params().setSync_ddl(ddl.isSync_ddl());
|
||||
// forward debug_actions to the catalogd
|
||||
// Forward relevant query options to the catalogd.
|
||||
TDdlQueryOptions ddlQueryOpts = new TDdlQueryOptions();
|
||||
ddlQueryOpts.setSync_ddl(result.getQuery_options().isSync_ddl());
|
||||
if (result.getQuery_options().isSetDebug_action()) {
|
||||
ddl.getDdl_params()
|
||||
.setDebug_action(result.getQuery_options().getDebug_action());
|
||||
ddlQueryOpts.setDebug_action(result.getQuery_options().getDebug_action());
|
||||
}
|
||||
ddlQueryOpts.setLock_max_wait_time_s(
|
||||
result.getQuery_options().lock_max_wait_time_s);
|
||||
ddl.getDdl_params().setQuery_options(ddlQueryOpts);
|
||||
} else if (ddl.getOp_type() == TCatalogOpType.RESET_METADATA) {
|
||||
ddl.getReset_metadata_params().setSync_ddl(ddl.isSync_ddl());
|
||||
ddl.getReset_metadata_params().setRefresh_updated_hms_partitions(
|
||||
@@ -2057,7 +2061,7 @@ public class Frontend {
|
||||
insertStmt.getPartitionKeyValues());
|
||||
}
|
||||
createLockForInsert(txnId, tables, targetTable, insertStmt.isOverwrite(),
|
||||
staticPartitionTarget);
|
||||
staticPartitionTarget, queryOptions);
|
||||
long writeId = allocateWriteId(queryCtx, targetTable);
|
||||
insertStmt.setWriteId(writeId);
|
||||
|
||||
@@ -2544,10 +2548,12 @@ public class Frontend {
|
||||
* @param isOverwrite true when the INSERT stmt is an INSERT OVERWRITE
|
||||
* @param staticPartitionTarget the static partition target in case of static partition
|
||||
* INSERT
|
||||
* @param queryOptions the query options for this INSERT statement
|
||||
* @throws TransactionException
|
||||
*/
|
||||
private void createLockForInsert(Long txnId, Collection<FeTable> tables,
|
||||
FeTable targetTable, boolean isOverwrite, String staticPartitionTarget)
|
||||
FeTable targetTable, boolean isOverwrite, String staticPartitionTarget,
|
||||
TQueryOptions queryOptions)
|
||||
throws TransactionException {
|
||||
Preconditions.checkState(
|
||||
AcidUtils.isTransactionalTable(targetTable.getMetaStoreTable().getParameters()));
|
||||
@@ -2583,7 +2589,8 @@ public class Frontend {
|
||||
}
|
||||
try (MetaStoreClient client = metaStoreClientPool_.getClient()) {
|
||||
IMetaStoreClient hmsClient = client.getHiveClient();
|
||||
MetastoreShim.acquireLock(hmsClient, txnId, lockComponents);
|
||||
MetastoreShim.acquireLock(hmsClient, txnId, lockComponents,
|
||||
queryOptions.lock_max_wait_time_s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ import org.apache.impala.thrift.TAlterDbSetOwnerParams;
|
||||
import org.apache.impala.thrift.TAlterDbType;
|
||||
import org.apache.impala.thrift.TCreateDbParams;
|
||||
import org.apache.impala.thrift.TDdlExecRequest;
|
||||
import org.apache.impala.thrift.TDdlQueryOptions;
|
||||
import org.apache.impala.thrift.TDdlType;
|
||||
import org.apache.impala.thrift.TDropDbParams;
|
||||
import org.apache.impala.thrift.TOwnerType;
|
||||
@@ -118,6 +119,7 @@ public class AlterDatabaseTest {
|
||||
*/
|
||||
private static TDdlExecRequest dropDbRequest() {
|
||||
TDdlExecRequest request = new TDdlExecRequest();
|
||||
request.setQuery_options(new TDdlQueryOptions());
|
||||
request.setDdl_type(TDdlType.DROP_DATABASE);
|
||||
TDropDbParams dropDbParams = new TDropDbParams();
|
||||
dropDbParams.setDb(TEST_ALTER_DB);
|
||||
@@ -132,6 +134,7 @@ public class AlterDatabaseTest {
|
||||
*/
|
||||
private static TDdlExecRequest createDbRequest() {
|
||||
TDdlExecRequest request = new TDdlExecRequest();
|
||||
request.setQuery_options(new TDdlQueryOptions());
|
||||
request.setDdl_type(TDdlType.CREATE_DATABASE);
|
||||
TCreateDbParams createDbParams = new TCreateDbParams();
|
||||
createDbParams.setDb(TEST_ALTER_DB);
|
||||
@@ -217,6 +220,7 @@ public class AlterDatabaseTest {
|
||||
alterDbParams.setAlter_type(TAlterDbType.SET_OWNER);
|
||||
alterDbParams.setSet_owner_params(alterDbSetOwnerParams);
|
||||
TDdlExecRequest request = new TDdlExecRequest();
|
||||
request.setQuery_options(new TDdlQueryOptions());
|
||||
request.setDdl_type(TDdlType.ALTER_DATABASE);
|
||||
request.setAlter_db_params(alterDbParams);
|
||||
return request;
|
||||
|
||||
@@ -135,6 +135,7 @@ import org.apache.impala.thrift.TCreateFunctionParams;
|
||||
import org.apache.impala.thrift.TCreateTableLikeParams;
|
||||
import org.apache.impala.thrift.TCreateTableParams;
|
||||
import org.apache.impala.thrift.TDdlExecRequest;
|
||||
import org.apache.impala.thrift.TDdlQueryOptions;
|
||||
import org.apache.impala.thrift.TDdlType;
|
||||
import org.apache.impala.thrift.TDropDbParams;
|
||||
import org.apache.impala.thrift.TDropFunctionParams;
|
||||
@@ -3198,6 +3199,7 @@ public class MetastoreEventsProcessorTest {
|
||||
*/
|
||||
private void dropTableFromImpala(String dbName, String tblName) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.DROP_TABLE);
|
||||
TDropTableOrViewParams dropTableParams = new TDropTableOrViewParams();
|
||||
dropTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3212,6 +3214,7 @@ public class MetastoreEventsProcessorTest {
|
||||
public static void createDatabaseFromImpala(CatalogOpExecutor catalogOpExecutor,
|
||||
String dbName, String desc) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.CREATE_DATABASE);
|
||||
TCreateDbParams createDbParams = new TCreateDbParams();
|
||||
createDbParams.setDb(dbName);
|
||||
@@ -3231,6 +3234,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterDbSetOwnerFromImpala(
|
||||
String dbName, String owner, TOwnerType ownerType) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_DATABASE);
|
||||
TAlterDbParams alterDbParams = new TAlterDbParams();
|
||||
alterDbParams.setDb(dbName);
|
||||
@@ -3248,6 +3252,7 @@ public class MetastoreEventsProcessorTest {
|
||||
*/
|
||||
private void dropDatabaseCascadeFromImpala(String dbName) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.DROP_DATABASE);
|
||||
TDropDbParams dropDbParams = new TDropDbParams();
|
||||
dropDbParams.setDb(dbName);
|
||||
@@ -3264,6 +3269,7 @@ public class MetastoreEventsProcessorTest {
|
||||
createTableLikeParams.setTable_name(new TTableName(destDb, destTbl));
|
||||
createTableLikeParams.setIs_external(false);
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.CREATE_TABLE_LIKE);
|
||||
req.create_table_like_params = createTableLikeParams;
|
||||
catalogOpExecutor_.execDdlRequest(req);
|
||||
@@ -3282,6 +3288,7 @@ public class MetastoreEventsProcessorTest {
|
||||
String tblName, Map<String, String> tblParams, boolean isPartitioned)
|
||||
throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.CREATE_TABLE);
|
||||
TCreateTableParams createTableParams = new TCreateTableParams();
|
||||
createTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3317,6 +3324,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void createScalarFunctionFromImpala(ScalarFunction fn) throws
|
||||
ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.CREATE_FUNCTION);
|
||||
TCreateFunctionParams createFunctionParams = new TCreateFunctionParams();
|
||||
createFunctionParams.setFn(fn.toThrift());
|
||||
@@ -3329,6 +3337,7 @@ public class MetastoreEventsProcessorTest {
|
||||
*/
|
||||
private void dropScalarFunctionFromImapala(ScalarFunction fn) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.DROP_FUNCTION);
|
||||
TDropFunctionParams dropFunctionParams = new TDropFunctionParams();
|
||||
dropFunctionParams.setFn_name(fn.getFunctionName().toThrift());
|
||||
@@ -3343,6 +3352,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void renameTableFromImpala(String oldTblName, String newTblName)
|
||||
throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableOrViewRenameParams renameParams = new TAlterTableOrViewRenameParams();
|
||||
renameParams.new_table_name = new TTableName(TEST_DB_NAME, newTblName);
|
||||
@@ -3360,6 +3370,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableAddColsFromImpala(String dbName, String tblName, String colName,
|
||||
TPrimitiveType colType) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3379,6 +3390,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableRemoveColFromImpala(
|
||||
String dbName, String tblName, String colName) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3395,6 +3407,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableReplaceColFromImpala(
|
||||
String dbName, String tblName, List<TColumn> newCols) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3415,6 +3428,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableAddPartition(
|
||||
String dbName, String tblName, TPartitionDef partitionDef) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3433,6 +3447,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableDropPartition(String dbName, String tblName,
|
||||
List<TPartitionKeyValue> keyValue) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3451,6 +3466,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableSetFileFormatFromImpala(
|
||||
String dbName, String tblName, THdfsFileFormat fileformat) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3469,6 +3485,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableSetRowFormatFromImpala(
|
||||
String dbName, String tblName, String fieldTerminator) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3488,6 +3505,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableSetOwnerFromImpala(String dbName, String tblName, String owner)
|
||||
throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3507,6 +3525,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableSetLocationFromImpala(
|
||||
String dbName, String tblName, String location) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3524,6 +3543,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableRenameFromImpala(String dbName, String tblName, String newTable)
|
||||
throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(dbName, tblName));
|
||||
@@ -3541,6 +3561,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableSetTblPropertiesFromImpala(String tblName)
|
||||
throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(TEST_DB_NAME, tblName));
|
||||
@@ -3564,6 +3585,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableComputeStats(String tblName, List<List<String>> partValsList)
|
||||
throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
@@ -3595,6 +3617,7 @@ public class MetastoreEventsProcessorTest {
|
||||
private void alterTableSetPartitionPropertiesFromImpala(
|
||||
String tblName, List<TPartitionKeyValue> partKeyVal) throws ImpalaException {
|
||||
TDdlExecRequest req = new TDdlExecRequest();
|
||||
req.setQuery_options(new TDdlQueryOptions());
|
||||
req.setDdl_type(TDdlType.ALTER_TABLE);
|
||||
TAlterTableParams alterTableParams = new TAlterTableParams();
|
||||
alterTableParams.setTable_name(new TTableName(TEST_DB_NAME, tblName));
|
||||
|
||||
@@ -21,7 +21,7 @@ import os
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from hive_metastore.ttypes import CommitTxnRequest, OpenTxnRequest
|
||||
from hive_metastore.ttypes import CommitTxnRequest, LockType, OpenTxnRequest
|
||||
from subprocess import check_call
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.skip import (SkipIf, SkipIfHive2, SkipIfCatalogV2, SkipIfS3, SkipIfABFS,
|
||||
@@ -304,6 +304,52 @@ class TestAcid(ImpalaTestSuite):
|
||||
commit_req.txnid = txn_id
|
||||
return self.hive_client.commit_txn(commit_req)
|
||||
|
||||
@SkipIfS3.hive
|
||||
@SkipIfGCS.hive
|
||||
@SkipIfCOS.hive
|
||||
@SkipIfABFS.hive
|
||||
@SkipIfADLS.hive
|
||||
@SkipIfIsilon.hive
|
||||
@SkipIfLocal.hive
|
||||
def test_lock_timings(self, vector, unique_database):
|
||||
def elapsed_time_for_query(query):
|
||||
t_start = time.time()
|
||||
self.execute_query_expect_failure(self.client, query)
|
||||
return time.time() - t_start
|
||||
|
||||
tbl_name = "test_lock"
|
||||
tbl = "{0}.{1}".format(unique_database, tbl_name)
|
||||
self.execute_query("create table {} (i int) tblproperties"
|
||||
"('transactional'='true',"
|
||||
"'transactional_properties'='insert_only')".format(tbl))
|
||||
acid_util = AcidTxn(self.hive_client)
|
||||
lock_resp = acid_util.lock(0, unique_database, tbl_name, LockType.EXCLUSIVE)
|
||||
try:
|
||||
if self.exploration_strategy() == 'exhaustive':
|
||||
elapsed = elapsed_time_for_query("insert into {} values (1)".format(tbl))
|
||||
assert elapsed > 300 and elapsed < 305
|
||||
self.execute_query("set lock_max_wait_time_s=20")
|
||||
elapsed = elapsed_time_for_query("insert into {} values (1)".format(tbl))
|
||||
assert elapsed > 20 and elapsed < 25
|
||||
|
||||
self.execute_query("set lock_max_wait_time_s=0")
|
||||
elapsed = elapsed_time_for_query("insert into {} values (1)".format(tbl))
|
||||
assert elapsed < 5
|
||||
|
||||
self.execute_query("set lock_max_wait_time_s=10")
|
||||
elapsed = elapsed_time_for_query("insert into {} values (1)".format(tbl))
|
||||
assert elapsed > 10 and elapsed < 15
|
||||
|
||||
self.execute_query("set lock_max_wait_time_s=2")
|
||||
elapsed = elapsed_time_for_query("truncate table {}".format(tbl))
|
||||
assert elapsed > 2 and elapsed < 7
|
||||
|
||||
self.execute_query("set lock_max_wait_time_s=5")
|
||||
elapsed = elapsed_time_for_query("drop table {}".format(tbl))
|
||||
assert elapsed > 5 and elapsed < 10
|
||||
finally:
|
||||
acid_util.unlock(lock_resp.lockid)
|
||||
|
||||
@SkipIfHive2.acid
|
||||
@SkipIfS3.hive
|
||||
@SkipIfGCS.hive
|
||||
|
||||
Reference in New Issue
Block a user