mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-1427: Improvements to "Unknown disk-ID" warning
- Removes the runtime unknown disk ID reporting and instead moves it to the explain plan as a counter that prints the number of scan ranges missing disk IDs in the corresponding HDFS scan nodes. - Adds a warning to the header of query profile/explain plan with a list of tables missing disk ids. - Removes reference to enabling dfs block metadata configuration, since it doesn't apply anymore. - Removes VolumeId terminology from the runtime profile. Change-Id: Iddb132ff7ad66f3291b93bf9d8061bd0525ef1b2 Reviewed-on: http://gerrit.cloudera.org:8080/5828 Reviewed-by: Bharath Vissapragada <bharathv@cloudera.com> Tested-by: Impala Public Jenkins
This commit is contained in:
committed by
Impala Public Jenkins
parent
d074f71d81
commit
fcc2d817b8
@@ -59,9 +59,9 @@
|
||||
|
||||
DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, "
|
||||
"that a scan node will wait for expected runtime filters to arrive.");
|
||||
DEFINE_bool(suppress_unknown_disk_id_warnings, false,
|
||||
"Suppress unknown disk id warnings generated when the HDFS implementation does not"
|
||||
" provide volume/disk information.");
|
||||
|
||||
// TODO: Remove this flag in a compatibility-breaking release.
|
||||
DEFINE_bool(suppress_unknown_disk_id_warnings, false, "Deprecated.");
|
||||
|
||||
#ifndef NDEBUG
|
||||
DECLARE_bool(skip_file_runtime_filtering);
|
||||
@@ -89,7 +89,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
|
||||
reader_context_(NULL),
|
||||
tuple_desc_(NULL),
|
||||
hdfs_table_(NULL),
|
||||
unknown_disk_id_warned_(false),
|
||||
initial_ranges_issued_(false),
|
||||
counters_running_(false),
|
||||
max_compressed_text_file_length_(NULL),
|
||||
@@ -263,14 +262,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
|
||||
}
|
||||
|
||||
bool expected_local = params.__isset.is_remote && !params.is_remote;
|
||||
if (expected_local && params.volume_id == -1) {
|
||||
if (!FLAGS_suppress_unknown_disk_id_warnings && !unknown_disk_id_warned_) {
|
||||
runtime_profile()->AppendExecOption("Missing Volume Id");
|
||||
runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK));
|
||||
unknown_disk_id_warned_ = true;
|
||||
}
|
||||
++num_ranges_missing_volume_id;
|
||||
}
|
||||
if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id;
|
||||
|
||||
bool try_cache = params.is_cached;
|
||||
if (runtime_state_->query_options().disable_cached_reads) {
|
||||
|
||||
@@ -305,10 +305,6 @@ class HdfsScanNodeBase : public ScanNode {
|
||||
/// The root of the table's Avro schema, if we're scanning an Avro table.
|
||||
ScopedAvroSchemaElement avro_schema_;
|
||||
|
||||
/// If true, the warning that some disk ids are unknown was logged. Only log this once
|
||||
/// per scan node since it can be noisy.
|
||||
bool unknown_disk_id_warned_;
|
||||
|
||||
/// Partitions scanned by this scan node.
|
||||
std::unordered_set<int64_t> partition_ids_;
|
||||
|
||||
|
||||
@@ -60,6 +60,7 @@ static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
|
||||
static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores";
|
||||
static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
|
||||
static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
|
||||
static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids";
|
||||
|
||||
ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx,
|
||||
ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server,
|
||||
@@ -428,6 +429,18 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
|
||||
summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
|
||||
}
|
||||
|
||||
if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
|
||||
!query_exec_request.query_ctx.tables_missing_diskids.empty()) {
|
||||
stringstream ss;
|
||||
const vector<TTableName>& tbls =
|
||||
query_exec_request.query_ctx.tables_missing_diskids;
|
||||
for (int i = 0; i < tbls.size(); ++i) {
|
||||
if (i != 0) ss << ",";
|
||||
ss << tbls[i].db_name << "." << tbls[i].table_name;
|
||||
}
|
||||
summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
|
||||
}
|
||||
|
||||
{
|
||||
lock_guard<mutex> l(lock_);
|
||||
// Don't start executing the query if Cancel() was called concurrently with Exec().
|
||||
|
||||
@@ -322,6 +322,9 @@ struct TQueryCtx {
|
||||
// backend in NativeEvalExprsWithoutRow() in FESupport. This flag is only advisory to
|
||||
// avoid the overhead of codegen and can be ignored if codegen is needed functionally.
|
||||
14: optional bool disable_codegen_hint = false;
|
||||
|
||||
// List of tables with scan ranges that map to blocks with missing disk IDs.
|
||||
15: optional list<CatalogObjects.TTableName> tables_missing_diskids
|
||||
}
|
||||
|
||||
// Context to collect information, which is shared among all instances of that plan
|
||||
|
||||
@@ -94,220 +94,216 @@ error_codes = (
|
||||
("SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT", 25,
|
||||
"Snappy: Decompressed size is not correct."),
|
||||
|
||||
("HDFS_SCAN_NODE_UNKNOWN_DISK", 26, "Unknown disk id. "
|
||||
"This will negatively affect performance. "
|
||||
"Check your hdfs settings to enable block location metadata."),
|
||||
|
||||
("FRAGMENT_EXECUTOR", 27, "Reserved resource size ($0) is larger than "
|
||||
("FRAGMENT_EXECUTOR", 26, "Reserved resource size ($0) is larger than "
|
||||
"query mem limit ($1), and will be restricted to $1. Configure the reservation "
|
||||
"size by setting RM_INITIAL_MEM."),
|
||||
|
||||
("PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH", 28,
|
||||
("PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH", 27,
|
||||
"Cannot perform join at hash join node with id $0."
|
||||
" The input data was partitioned the maximum number of $1 times."
|
||||
" This could mean there is significant skew in the data or the memory limit is"
|
||||
" set too low."),
|
||||
|
||||
("PARTITIONED_AGG_MAX_PARTITION_DEPTH", 29,
|
||||
("PARTITIONED_AGG_MAX_PARTITION_DEPTH", 28,
|
||||
"Cannot perform aggregation at hash aggregation node with id $0."
|
||||
" The input data was partitioned the maximum number of $1 times."
|
||||
" This could mean there is significant skew in the data or the memory limit is"
|
||||
" set too low."),
|
||||
|
||||
("MISSING_BUILTIN", 30, "Builtin '$0' with symbol '$1' does not exist. "
|
||||
("MISSING_BUILTIN", 29, "Builtin '$0' with symbol '$1' does not exist. "
|
||||
"Verify that all your impalads are the same version."),
|
||||
|
||||
("RPC_GENERAL_ERROR", 31, "RPC Error: $0"),
|
||||
("RPC_RECV_TIMEOUT", 32, "RPC recv timed out: $0"),
|
||||
("RPC_GENERAL_ERROR", 30, "RPC Error: $0"),
|
||||
("RPC_RECV_TIMEOUT", 31, "RPC recv timed out: $0"),
|
||||
|
||||
("UDF_VERIFY_FAILED", 33,
|
||||
("UDF_VERIFY_FAILED", 32,
|
||||
"Failed to verify function $0 from LLVM module $1, see log for more details."),
|
||||
|
||||
("PARQUET_CORRUPT_RLE_BYTES", 34, "File $0 corrupt. RLE level data bytes = $1"),
|
||||
("PARQUET_CORRUPT_RLE_BYTES", 33, "File $0 corrupt. RLE level data bytes = $1"),
|
||||
|
||||
("AVRO_DECIMAL_RESOLUTION_ERROR", 35, "Column '$0' has conflicting Avro decimal types. "
|
||||
("AVRO_DECIMAL_RESOLUTION_ERROR", 34, "Column '$0' has conflicting Avro decimal types. "
|
||||
"Table schema $1: $2, file schema $1: $3"),
|
||||
|
||||
("AVRO_DECIMAL_METADATA_MISMATCH", 36, "Column '$0' has conflicting Avro decimal types. "
|
||||
("AVRO_DECIMAL_METADATA_MISMATCH", 35, "Column '$0' has conflicting Avro decimal types. "
|
||||
"Declared $1: $2, $1 in table's Avro schema: $3"),
|
||||
|
||||
("AVRO_SCHEMA_RESOLUTION_ERROR", 37, "Unresolvable types for column '$0': "
|
||||
("AVRO_SCHEMA_RESOLUTION_ERROR", 36, "Unresolvable types for column '$0': "
|
||||
"table type: $1, file type: $2"),
|
||||
|
||||
("AVRO_SCHEMA_METADATA_MISMATCH", 38, "Unresolvable types for column '$0': "
|
||||
("AVRO_SCHEMA_METADATA_MISMATCH", 37, "Unresolvable types for column '$0': "
|
||||
"declared column type: $1, table's Avro schema type: $2"),
|
||||
|
||||
("AVRO_UNSUPPORTED_DEFAULT_VALUE", 39, "Field $0 is missing from file and default "
|
||||
("AVRO_UNSUPPORTED_DEFAULT_VALUE", 38, "Field $0 is missing from file and default "
|
||||
"values of type $1 are not yet supported."),
|
||||
|
||||
("AVRO_MISSING_FIELD", 40, "Inconsistent table metadata. Mismatch between column "
|
||||
("AVRO_MISSING_FIELD", 39, "Inconsistent table metadata. Mismatch between column "
|
||||
"definition and Avro schema: cannot read field $0 because there are only $1 fields."),
|
||||
|
||||
("AVRO_MISSING_DEFAULT", 41,
|
||||
("AVRO_MISSING_DEFAULT", 40,
|
||||
"Field $0 is missing from file and does not have a default value."),
|
||||
|
||||
("AVRO_NULLABILITY_MISMATCH", 42,
|
||||
("AVRO_NULLABILITY_MISMATCH", 41,
|
||||
"Field $0 is nullable in the file schema but not the table schema."),
|
||||
|
||||
("AVRO_NOT_A_RECORD", 43,
|
||||
("AVRO_NOT_A_RECORD", 42,
|
||||
"Inconsistent table metadata. Field $0 is not a record in the Avro schema."),
|
||||
|
||||
("PARQUET_DEF_LEVEL_ERROR", 44, "Could not read definition level, even though metadata"
|
||||
("PARQUET_DEF_LEVEL_ERROR", 43, "Could not read definition level, even though metadata"
|
||||
" states there are $0 values remaining in data page. file=$1"),
|
||||
|
||||
("PARQUET_NUM_COL_VALS_ERROR", 45, "Mismatched number of values in column index $0 "
|
||||
("PARQUET_NUM_COL_VALS_ERROR", 44, "Mismatched number of values in column index $0 "
|
||||
"($1 vs. $2). file=$3"),
|
||||
|
||||
("PARQUET_DICT_DECODE_FAILURE", 46, "File '$0' is corrupt: error decoding "
|
||||
("PARQUET_DICT_DECODE_FAILURE", 45, "File '$0' is corrupt: error decoding "
|
||||
"dictionary-encoded value of type $1 at offset $2"),
|
||||
|
||||
("SSL_PASSWORD_CMD_FAILED", 47,
|
||||
("SSL_PASSWORD_CMD_FAILED", 46,
|
||||
"SSL private-key password command ('$0') failed with error: $1"),
|
||||
|
||||
("SSL_CERTIFICATE_PATH_BLANK", 48, "The SSL certificate path is blank"),
|
||||
("SSL_PRIVATE_KEY_PATH_BLANK", 49, "The SSL private key path is blank"),
|
||||
("SSL_CERTIFICATE_PATH_BLANK", 47, "The SSL certificate path is blank"),
|
||||
("SSL_PRIVATE_KEY_PATH_BLANK", 48, "The SSL private key path is blank"),
|
||||
|
||||
("SSL_CERTIFICATE_NOT_FOUND", 50, "The SSL certificate file does not exist at path $0"),
|
||||
("SSL_PRIVATE_KEY_NOT_FOUND", 51, "The SSL private key file does not exist at path $0"),
|
||||
("SSL_CERTIFICATE_NOT_FOUND", 49, "The SSL certificate file does not exist at path $0"),
|
||||
("SSL_PRIVATE_KEY_NOT_FOUND", 50, "The SSL private key file does not exist at path $0"),
|
||||
|
||||
("SSL_SOCKET_CREATION_FAILED", 52, "SSL socket creation failed: $0"),
|
||||
("SSL_SOCKET_CREATION_FAILED", 51, "SSL socket creation failed: $0"),
|
||||
|
||||
("MEM_ALLOC_FAILED", 53, "Memory allocation of $0 bytes failed"),
|
||||
("MEM_ALLOC_FAILED", 52, "Memory allocation of $0 bytes failed"),
|
||||
|
||||
("PARQUET_REP_LEVEL_ERROR", 54, "Could not read repetition level, even though metadata"
|
||||
("PARQUET_REP_LEVEL_ERROR", 53, "Could not read repetition level, even though metadata"
|
||||
" states there are $0 values remaining in data page. file=$1"),
|
||||
|
||||
("PARQUET_UNRECOGNIZED_SCHEMA", 55, "File '$0' has an incompatible Parquet schema for "
|
||||
("PARQUET_UNRECOGNIZED_SCHEMA", 54, "File '$0' has an incompatible Parquet schema for "
|
||||
"column '$1'. Column type: $2, Parquet schema:\\n$3"),
|
||||
|
||||
("COLLECTION_ALLOC_FAILED", 56, "Failed to allocate $0 bytes for collection '$1'.\\n"
|
||||
("COLLECTION_ALLOC_FAILED", 55, "Failed to allocate $0 bytes for collection '$1'.\\n"
|
||||
"Current buffer size: $2 num tuples: $3."),
|
||||
|
||||
("TMP_DEVICE_BLACKLISTED", 57,
|
||||
("TMP_DEVICE_BLACKLISTED", 56,
|
||||
"Temporary device for directory $0 is blacklisted from a previous error and cannot "
|
||||
"be used."),
|
||||
|
||||
("TMP_FILE_BLACKLISTED", 58,
|
||||
("TMP_FILE_BLACKLISTED", 57,
|
||||
"Temporary file $0 is blacklisted from a previous error and cannot be expanded."),
|
||||
|
||||
("RPC_CLIENT_CONNECT_FAILURE", 59,
|
||||
("RPC_CLIENT_CONNECT_FAILURE", 58,
|
||||
"RPC client failed to connect: $0"),
|
||||
|
||||
("STALE_METADATA_FILE_TOO_SHORT", 60, "Metadata for file '$0' appears stale. "
|
||||
("STALE_METADATA_FILE_TOO_SHORT", 59, "Metadata for file '$0' appears stale. "
|
||||
"Try running \\\"refresh $1\\\" to reload the file metadata."),
|
||||
|
||||
("PARQUET_BAD_VERSION_NUMBER", 61, "File '$0' has an invalid version number: $1\\n"
|
||||
("PARQUET_BAD_VERSION_NUMBER", 60, "File '$0' has an invalid version number: $1\\n"
|
||||
"This could be due to stale metadata. Try running \\\"refresh $2\\\"."),
|
||||
|
||||
("SCANNER_INCOMPLETE_READ", 62, "Tried to read $0 bytes but could only read $1 bytes. "
|
||||
("SCANNER_INCOMPLETE_READ", 61, "Tried to read $0 bytes but could only read $1 bytes. "
|
||||
"This may indicate data file corruption. (file $2, byte offset: $3)"),
|
||||
|
||||
("SCANNER_INVALID_READ", 63, "Invalid read of $0 bytes. This may indicate data file "
|
||||
("SCANNER_INVALID_READ", 62, "Invalid read of $0 bytes. This may indicate data file "
|
||||
"corruption. (file $1, byte offset: $2)"),
|
||||
|
||||
("AVRO_BAD_VERSION_HEADER", 64, "File '$0' has an invalid version header: $1\\n"
|
||||
("AVRO_BAD_VERSION_HEADER", 63, "File '$0' has an invalid version header: $1\\n"
|
||||
"Make sure the file is an Avro data file."),
|
||||
|
||||
("UDF_MEM_LIMIT_EXCEEDED", 65, "$0's allocations exceeded memory limits."),
|
||||
("UDF_MEM_LIMIT_EXCEEDED", 64, "$0's allocations exceeded memory limits."),
|
||||
|
||||
("BTS_BLOCK_OVERFLOW", 66, "Cannot process row that is bigger than the IO size "
|
||||
("BTS_BLOCK_OVERFLOW", 65, "Cannot process row that is bigger than the IO size "
|
||||
"(row_size=$0, null_indicators_size=$1). To run this query, increase the IO size "
|
||||
"(--read_size option)."),
|
||||
|
||||
("COMPRESSED_FILE_MULTIPLE_BLOCKS", 67,
|
||||
("COMPRESSED_FILE_MULTIPLE_BLOCKS", 66,
|
||||
"For better performance, snappy-, gzip-, and bzip-compressed files "
|
||||
"should not be split into multiple HDFS blocks. file=$0 offset $1"),
|
||||
|
||||
("COMPRESSED_FILE_BLOCK_CORRUPTED", 68,
|
||||
("COMPRESSED_FILE_BLOCK_CORRUPTED", 67,
|
||||
"$0 Data error, likely data corrupted in this block."),
|
||||
|
||||
("COMPRESSED_FILE_DECOMPRESSOR_ERROR", 69, "$0 Decompressor error at $1, code=$2"),
|
||||
("COMPRESSED_FILE_DECOMPRESSOR_ERROR", 68, "$0 Decompressor error at $1, code=$2"),
|
||||
|
||||
("COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS", 70,
|
||||
("COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS", 69,
|
||||
"Decompression failed to make progress, but end of input is not reached. "
|
||||
"File appears corrupted. file=$0"),
|
||||
|
||||
("COMPRESSED_FILE_TRUNCATED", 71,
|
||||
("COMPRESSED_FILE_TRUNCATED", 70,
|
||||
"Unexpected end of compressed file. File may be truncated. file=$0"),
|
||||
|
||||
("DATASTREAM_SENDER_TIMEOUT", 72, "Sender timed out waiting for receiver fragment "
|
||||
("DATASTREAM_SENDER_TIMEOUT", 71, "Sender timed out waiting for receiver fragment "
|
||||
"instance: $0"),
|
||||
|
||||
("KUDU_IMPALA_TYPE_MISSING", 73, "Kudu type $0 is not available in Impala."),
|
||||
("KUDU_IMPALA_TYPE_MISSING", 72, "Kudu type $0 is not available in Impala."),
|
||||
|
||||
("IMPALA_KUDU_TYPE_MISSING", 74, "Impala type $0 is not available in Kudu."),
|
||||
("IMPALA_KUDU_TYPE_MISSING", 73, "Impala type $0 is not available in Kudu."),
|
||||
|
||||
("KUDU_NOT_SUPPORTED_ON_OS", 75, "Kudu is not supported on this operating system."),
|
||||
("KUDU_NOT_SUPPORTED_ON_OS", 74, "Kudu is not supported on this operating system."),
|
||||
|
||||
("KUDU_NOT_ENABLED", 76, "Kudu features are disabled by the startup flag "
|
||||
("KUDU_NOT_ENABLED", 75, "Kudu features are disabled by the startup flag "
|
||||
"--disable_kudu."),
|
||||
|
||||
("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 77, "Cannot perform hash join at node with "
|
||||
("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 76, "Cannot perform hash join at node with "
|
||||
"id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
|
||||
"level $1. Number of rows $2."),
|
||||
|
||||
("PARTITIONED_AGG_REPARTITION_FAILS", 78, "Cannot perform aggregation at node with "
|
||||
("PARTITIONED_AGG_REPARTITION_FAILS", 77, "Cannot perform aggregation at node with "
|
||||
"id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
|
||||
"level $1. Number of rows $2."),
|
||||
|
||||
("AVRO_TRUNCATED_BLOCK", 79, "File '$0' is corrupt: truncated data block at offset $1"),
|
||||
("AVRO_TRUNCATED_BLOCK", 78, "File '$0' is corrupt: truncated data block at offset $1"),
|
||||
|
||||
("AVRO_INVALID_UNION", 80, "File '$0' is corrupt: invalid union value $1 at offset $2"),
|
||||
("AVRO_INVALID_UNION", 79, "File '$0' is corrupt: invalid union value $1 at offset $2"),
|
||||
|
||||
("AVRO_INVALID_BOOLEAN", 81, "File '$0' is corrupt: invalid boolean value $1 at offset "
|
||||
("AVRO_INVALID_BOOLEAN", 80, "File '$0' is corrupt: invalid boolean value $1 at offset "
|
||||
"$2"),
|
||||
|
||||
("AVRO_INVALID_LENGTH", 82, "File '$0' is corrupt: invalid length $1 at offset $2"),
|
||||
("AVRO_INVALID_LENGTH", 81, "File '$0' is corrupt: invalid length $1 at offset $2"),
|
||||
|
||||
("SCANNER_INVALID_INT", 83, "File '$0' is corrupt: invalid encoded integer at offset $1"),
|
||||
("SCANNER_INVALID_INT", 82, "File '$0' is corrupt: invalid encoded integer at offset $1"),
|
||||
|
||||
("AVRO_INVALID_RECORD_COUNT", 84, "File '$0' is corrupt: invalid record count $1 at "
|
||||
("AVRO_INVALID_RECORD_COUNT", 83, "File '$0' is corrupt: invalid record count $1 at "
|
||||
"offset $2"),
|
||||
|
||||
("AVRO_INVALID_COMPRESSED_SIZE", 85, "File '$0' is corrupt: invalid compressed block "
|
||||
("AVRO_INVALID_COMPRESSED_SIZE", 84, "File '$0' is corrupt: invalid compressed block "
|
||||
"size $1 at offset $2"),
|
||||
|
||||
("AVRO_INVALID_METADATA_COUNT", 86, "File '$0' is corrupt: invalid metadata count $1 "
|
||||
("AVRO_INVALID_METADATA_COUNT", 85, "File '$0' is corrupt: invalid metadata count $1 "
|
||||
"at offset $2"),
|
||||
|
||||
("SCANNER_STRING_LENGTH_OVERFLOW", 87, "File '$0' could not be read: string $1 was "
|
||||
("SCANNER_STRING_LENGTH_OVERFLOW", 86, "File '$0' could not be read: string $1 was "
|
||||
"longer than supported limit of $2 bytes at offset $3"),
|
||||
|
||||
("PARQUET_CORRUPT_PLAIN_VALUE", 88, "File '$0' is corrupt: error decoding value of type "
|
||||
("PARQUET_CORRUPT_PLAIN_VALUE", 87, "File '$0' is corrupt: error decoding value of type "
|
||||
"$1 at offset $2"),
|
||||
|
||||
("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for "
|
||||
("PARQUET_CORRUPT_DICTIONARY", 88, "File '$0' is corrupt: error reading dictionary for "
|
||||
"data of type $1: $2"),
|
||||
|
||||
("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum "
|
||||
("TEXT_PARSER_TRUNCATED_COLUMN", 89, "Length of column is $0 which exceeds maximum "
|
||||
"supported length of 2147483647 bytes."),
|
||||
|
||||
("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query "
|
||||
("SCRATCH_LIMIT_EXCEEDED", 90, "Scratch space limit of $0 bytes exceeded for query "
|
||||
"while spilling data to disk."),
|
||||
|
||||
("BUFFER_ALLOCATION_FAILED", 92, "Unexpected error allocating $0 byte buffer."),
|
||||
("BUFFER_ALLOCATION_FAILED", 91, "Unexpected error allocating $0 byte buffer."),
|
||||
|
||||
("PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE", 93, "File '$0' is corrupt: metadata indicates "
|
||||
("PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE", 92, "File '$0' is corrupt: metadata indicates "
|
||||
"a zero row count but there is at least one non-empty row group."),
|
||||
|
||||
("NO_REGISTERED_BACKENDS", 94, "Cannot schedule query: no registered backends "
|
||||
("NO_REGISTERED_BACKENDS", 93, "Cannot schedule query: no registered backends "
|
||||
"available."),
|
||||
|
||||
("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."),
|
||||
("KUDU_KEY_ALREADY_PRESENT", 94, "Key already present in Kudu table '$0'."),
|
||||
|
||||
("KUDU_NOT_FOUND", 96, "Not found in Kudu table '$0': $1"),
|
||||
("KUDU_NOT_FOUND", 95, "Not found in Kudu table '$0': $1"),
|
||||
|
||||
("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1"),
|
||||
("KUDU_SESSION_ERROR", 96, "Error in Kudu table '$0': $1"),
|
||||
|
||||
("AVRO_UNSUPPORTED_TYPE", 98, "Column '$0': unsupported Avro type '$1'"),
|
||||
("AVRO_UNSUPPORTED_TYPE", 97, "Column '$0': unsupported Avro type '$1'"),
|
||||
|
||||
("AVRO_INVALID_DECIMAL", 99,
|
||||
("AVRO_INVALID_DECIMAL", 98,
|
||||
"Column '$0': invalid Avro decimal type with precision = '$1' scale = '$2'"),
|
||||
|
||||
("KUDU_NULL_CONSTRAINT_VIOLATION", 100,
|
||||
("KUDU_NULL_CONSTRAINT_VIOLATION", 99,
|
||||
"Row with null value violates nullability constraint on table '$0'."),
|
||||
|
||||
("PARQUET_TIMESTAMP_OUT_OF_RANGE", 101,
|
||||
("PARQUET_TIMESTAMP_OUT_OF_RANGE", 100,
|
||||
"Parquet file '$0' column '$1' contains an out of range timestamp. "
|
||||
"The valid date range is 1400-01-01..9999-12-31."),
|
||||
)
|
||||
|
||||
@@ -17,12 +17,16 @@
|
||||
|
||||
package org.apache.impala.planner;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
||||
import org.apache.impala.analysis.Analyzer;
|
||||
import org.apache.impala.analysis.Expr;
|
||||
import org.apache.impala.analysis.SlotDescriptor;
|
||||
@@ -35,6 +39,8 @@ import org.apache.impala.catalog.HdfsPartition;
|
||||
import org.apache.impala.catalog.HdfsPartition.FileBlock;
|
||||
import org.apache.impala.catalog.HdfsTable;
|
||||
import org.apache.impala.catalog.Type;
|
||||
import org.apache.impala.common.FileSystemUtil;
|
||||
import org.apache.impala.common.ImpalaRuntimeException;
|
||||
import org.apache.impala.common.ImpalaException;
|
||||
import org.apache.impala.common.NotImplementedException;
|
||||
import org.apache.impala.common.PrintUtils;
|
||||
@@ -122,6 +128,14 @@ public class HdfsScanNode extends ScanNode {
|
||||
// to values > 0 for hdfs text files.
|
||||
private int skipHeaderLineCount_ = 0;
|
||||
|
||||
// Number of scan-ranges/files/partitions that have missing disk ids. Reported in the
|
||||
// explain plan.
|
||||
private int numScanRangesNoDiskIds_ = 0;
|
||||
private int numFilesNoDiskIds_ = 0;
|
||||
private int numPartitionsNoDiskIds_ = 0;
|
||||
|
||||
private static final Configuration CONF = new Configuration();
|
||||
|
||||
/**
|
||||
* Construct a node to scan given data files into tuples described by 'desc',
|
||||
* with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
|
||||
@@ -308,7 +322,8 @@ public class HdfsScanNode extends ScanNode {
|
||||
* ids, based on the given maximum number of bytes each scan range should scan.
|
||||
* Returns the set of file formats being scanned.
|
||||
*/
|
||||
private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) {
|
||||
private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer)
|
||||
throws ImpalaRuntimeException {
|
||||
long maxScanRangeLength = analyzer.getQueryCtx().client_request.getQuery_options()
|
||||
.getMax_scan_range_length();
|
||||
scanRanges_ = Lists.newArrayList();
|
||||
@@ -316,7 +331,18 @@ public class HdfsScanNode extends ScanNode {
|
||||
for (HdfsPartition partition: partitions_) {
|
||||
fileFormats.add(partition.getFileFormat());
|
||||
Preconditions.checkState(partition.getId() >= 0);
|
||||
// Missing disk id accounting is only done for file systems that support the notion
|
||||
// of disk/storage ids.
|
||||
FileSystem partitionFs;
|
||||
try {
|
||||
partitionFs = partition.getLocationPath().getFileSystem(CONF);
|
||||
} catch (IOException e) {
|
||||
throw new ImpalaRuntimeException("Error determining partition fs type", e);
|
||||
}
|
||||
boolean checkMissingDiskIds = FileSystemUtil.supportsStorageIds(partitionFs);
|
||||
boolean partitionMissingDiskIds = false;
|
||||
for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) {
|
||||
boolean fileDescMissingDiskIds = false;
|
||||
for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
|
||||
HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
|
||||
List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
|
||||
@@ -337,6 +363,11 @@ public class HdfsScanNode extends ScanNode {
|
||||
// Translate from network address to the global (to this request) host index.
|
||||
Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
|
||||
location.setHost_idx(globalHostIdx);
|
||||
if (checkMissingDiskIds && block.getDiskId(i) == -1) {
|
||||
++numScanRangesNoDiskIds_;
|
||||
partitionMissingDiskIds = true;
|
||||
fileDescMissingDiskIds = true;
|
||||
}
|
||||
location.setVolume_id(block.getDiskId(i));
|
||||
location.setIs_cached(block.isCached(i));
|
||||
locations.add(location);
|
||||
@@ -362,7 +393,15 @@ public class HdfsScanNode extends ScanNode {
|
||||
currentOffset += currentLength;
|
||||
}
|
||||
}
|
||||
if (fileDescMissingDiskIds) {
|
||||
++numFilesNoDiskIds_;
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("File blocks mapping to unknown disk ids. Dir: " +
|
||||
partition.getLocation() + " File:" + fileDesc.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_;
|
||||
}
|
||||
return fileFormats;
|
||||
}
|
||||
@@ -554,13 +593,13 @@ public class HdfsScanNode extends ScanNode {
|
||||
HdfsTable table = (HdfsTable) desc_.getTable();
|
||||
output.append(String.format("%s%s [%s", prefix, getDisplayLabel(),
|
||||
getDisplayLabelDetail()));
|
||||
int numPartitions = partitions_.size();
|
||||
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() &&
|
||||
fragment_.isPartitioned()) {
|
||||
output.append(", " + fragment_.getDataPartition().getExplainString());
|
||||
}
|
||||
output.append("]\n");
|
||||
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
|
||||
int numPartitions = partitions_.size();
|
||||
if (tbl_.getNumClusteringCols() == 0) numPartitions = 1;
|
||||
output.append(String.format("%spartitions=%s/%s files=%s size=%s", detailPrefix,
|
||||
numPartitions, table.getPartitions().size() - 1, totalFiles_,
|
||||
@@ -586,6 +625,12 @@ public class HdfsScanNode extends ScanNode {
|
||||
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
|
||||
output.append(getStatsExplainString(detailPrefix, detailLevel));
|
||||
output.append("\n");
|
||||
if (numScanRangesNoDiskIds_ > 0) {
|
||||
output.append(String.format("%smissing disk ids: " +
|
||||
"partitions=%s/%s files=%s/%s scan ranges %s/%s\n", detailPrefix,
|
||||
numPartitionsNoDiskIds_, numPartitions, numFilesNoDiskIds_,
|
||||
totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size()));
|
||||
}
|
||||
}
|
||||
return output.toString();
|
||||
}
|
||||
@@ -663,4 +708,6 @@ public class HdfsScanNode extends ScanNode {
|
||||
|
||||
@Override
|
||||
public boolean hasCorruptTableStats() { return hasCorruptTableStats_; }
|
||||
|
||||
public boolean hasMissingDiskIds() { return numScanRangesNoDiskIds_ > 0; }
|
||||
}
|
||||
|
||||
@@ -304,6 +304,16 @@ public class Planner {
|
||||
hasHeader = true;
|
||||
}
|
||||
|
||||
if (request.query_ctx.isSetTables_missing_diskids()) {
|
||||
List<String> tableNames = Lists.newArrayList();
|
||||
for (TTableName tableName: request.query_ctx.getTables_missing_diskids()) {
|
||||
tableNames.add(tableName.db_name + "." + tableName.table_name);
|
||||
}
|
||||
str.append("WARNING: The following tables have scan ranges with missing " +
|
||||
"disk id information.\n" + Joiner.on(", ").join(tableNames) + "\n");
|
||||
hasHeader = true;
|
||||
}
|
||||
|
||||
if (request.query_ctx.isDisable_spilling()) {
|
||||
str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" +
|
||||
"Reason: Query option disable_unsafe_spills is set, at least one table\n" +
|
||||
|
||||
@@ -86,6 +86,7 @@ import org.apache.impala.common.FileSystemUtil;
|
||||
import org.apache.impala.common.ImpalaException;
|
||||
import org.apache.impala.common.InternalException;
|
||||
import org.apache.impala.common.NotImplementedException;
|
||||
import org.apache.impala.planner.HdfsScanNode;
|
||||
import org.apache.impala.planner.PlanFragment;
|
||||
import org.apache.impala.planner.Planner;
|
||||
import org.apache.impala.planner.ScanNode;
|
||||
@@ -947,14 +948,17 @@ public class Frontend {
|
||||
LOG.trace("get scan range locations");
|
||||
Set<TTableName> tablesMissingStats = Sets.newTreeSet();
|
||||
Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
|
||||
Set<TTableName> tablesWithMissingDiskIds = Sets.newTreeSet();
|
||||
for (ScanNode scanNode: scanNodes) {
|
||||
result.putToPer_node_scan_ranges(
|
||||
scanNode.getId().asInt(), scanNode.getScanRangeLocations());
|
||||
if (scanNode.isTableMissingStats()) {
|
||||
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
|
||||
}
|
||||
if (scanNode.hasCorruptTableStats()) {
|
||||
tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift());
|
||||
|
||||
TTableName tableName = scanNode.getTupleDesc().getTableName().toThrift();
|
||||
if (scanNode.isTableMissingStats()) tablesMissingStats.add(tableName);
|
||||
if (scanNode.hasCorruptTableStats()) tablesWithCorruptStats.add(tableName);
|
||||
if (scanNode instanceof HdfsScanNode &&
|
||||
((HdfsScanNode) scanNode).hasMissingDiskIds()) {
|
||||
tablesWithMissingDiskIds.add(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -964,6 +968,9 @@ public class Frontend {
|
||||
for (TTableName tableName: tablesWithCorruptStats) {
|
||||
queryCtx.addToTables_with_corrupt_stats(tableName);
|
||||
}
|
||||
for (TTableName tableName: tablesWithMissingDiskIds) {
|
||||
queryCtx.addToTables_missing_diskids(tableName);
|
||||
}
|
||||
|
||||
// Compute resource requirements after scan range locations because the cost
|
||||
// estimates of scan nodes rely on them.
|
||||
|
||||
Reference in New Issue
Block a user