IMPALA-1427: Improvements to "Unknown disk-ID" warning

- Removes the runtime unknown disk ID reporting and instead moves
  it to the explain plan as a counter that prints the number of
  scan ranges missing disk IDs in the corresponding HDFS scan nodes.

- Adds a warning to the header of query profile/explain plan with a
  list of tables missing disk ids.

- Removes reference to enabling dfs block metadata configuration,
  since it doesn't apply anymore.

- Removes VolumeId terminology from the runtime profile.

Change-Id: Iddb132ff7ad66f3291b93bf9d8061bd0525ef1b2
Reviewed-on: http://gerrit.cloudera.org:8080/5828
Reviewed-by: Bharath Vissapragada <bharathv@cloudera.com>
Tested-by: Impala Public Jenkins
This commit is contained in:
Bharath Vissapragada
2017-01-30 10:37:41 -08:00
committed by Impala Public Jenkins
parent d074f71d81
commit fcc2d817b8
8 changed files with 166 additions and 102 deletions

View File

@@ -59,9 +59,9 @@
DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, "
"that a scan node will wait for expected runtime filters to arrive.");
DEFINE_bool(suppress_unknown_disk_id_warnings, false,
"Suppress unknown disk id warnings generated when the HDFS implementation does not"
" provide volume/disk information.");
// TODO: Remove this flag in a compatibility-breaking release.
DEFINE_bool(suppress_unknown_disk_id_warnings, false, "Deprecated.");
#ifndef NDEBUG
DECLARE_bool(skip_file_runtime_filtering);
@@ -89,7 +89,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
reader_context_(NULL),
tuple_desc_(NULL),
hdfs_table_(NULL),
unknown_disk_id_warned_(false),
initial_ranges_issued_(false),
counters_running_(false),
max_compressed_text_file_length_(NULL),
@@ -263,14 +262,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
}
bool expected_local = params.__isset.is_remote && !params.is_remote;
if (expected_local && params.volume_id == -1) {
if (!FLAGS_suppress_unknown_disk_id_warnings && !unknown_disk_id_warned_) {
runtime_profile()->AppendExecOption("Missing Volume Id");
runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK));
unknown_disk_id_warned_ = true;
}
++num_ranges_missing_volume_id;
}
if (expected_local && params.volume_id == -1) ++num_ranges_missing_volume_id;
bool try_cache = params.is_cached;
if (runtime_state_->query_options().disable_cached_reads) {

View File

@@ -305,10 +305,6 @@ class HdfsScanNodeBase : public ScanNode {
/// The root of the table's Avro schema, if we're scanning an Avro table.
ScopedAvroSchemaElement avro_schema_;
/// If true, the warning that some disk ids are unknown was logged. Only log this once
/// per scan node since it can be noisy.
bool unknown_disk_id_warned_;
/// Partitions scanned by this scan node.
std::unordered_set<int64_t> partition_ids_;

View File

@@ -60,6 +60,7 @@ static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores";
static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids";
ImpalaServer::QueryExecState::QueryExecState(const TQueryCtx& query_ctx,
ExecEnv* exec_env, Frontend* frontend, ImpalaServer* server,
@@ -428,6 +429,18 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
}
if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
!query_exec_request.query_ctx.tables_missing_diskids.empty()) {
stringstream ss;
const vector<TTableName>& tbls =
query_exec_request.query_ctx.tables_missing_diskids;
for (int i = 0; i < tbls.size(); ++i) {
if (i != 0) ss << ",";
ss << tbls[i].db_name << "." << tbls[i].table_name;
}
summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
}
{
lock_guard<mutex> l(lock_);
// Don't start executing the query if Cancel() was called concurrently with Exec().

View File

@@ -322,6 +322,9 @@ struct TQueryCtx {
// backend in NativeEvalExprsWithoutRow() in FESupport. This flag is only advisory to
// avoid the overhead of codegen and can be ignored if codegen is needed functionally.
14: optional bool disable_codegen_hint = false;
// List of tables with scan ranges that map to blocks with missing disk IDs.
15: optional list<CatalogObjects.TTableName> tables_missing_diskids
}
// Context to collect information, which is shared among all instances of that plan

View File

@@ -94,220 +94,216 @@ error_codes = (
("SNAPPY_DECOMPRESS_DECOMPRESS_SIZE_INCORRECT", 25,
"Snappy: Decompressed size is not correct."),
("HDFS_SCAN_NODE_UNKNOWN_DISK", 26, "Unknown disk id. "
"This will negatively affect performance. "
"Check your hdfs settings to enable block location metadata."),
("FRAGMENT_EXECUTOR", 27, "Reserved resource size ($0) is larger than "
("FRAGMENT_EXECUTOR", 26, "Reserved resource size ($0) is larger than "
"query mem limit ($1), and will be restricted to $1. Configure the reservation "
"size by setting RM_INITIAL_MEM."),
("PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH", 28,
("PARTITIONED_HASH_JOIN_MAX_PARTITION_DEPTH", 27,
"Cannot perform join at hash join node with id $0."
" The input data was partitioned the maximum number of $1 times."
" This could mean there is significant skew in the data or the memory limit is"
" set too low."),
("PARTITIONED_AGG_MAX_PARTITION_DEPTH", 29,
("PARTITIONED_AGG_MAX_PARTITION_DEPTH", 28,
"Cannot perform aggregation at hash aggregation node with id $0."
" The input data was partitioned the maximum number of $1 times."
" This could mean there is significant skew in the data or the memory limit is"
" set too low."),
("MISSING_BUILTIN", 30, "Builtin '$0' with symbol '$1' does not exist. "
("MISSING_BUILTIN", 29, "Builtin '$0' with symbol '$1' does not exist. "
"Verify that all your impalads are the same version."),
("RPC_GENERAL_ERROR", 31, "RPC Error: $0"),
("RPC_RECV_TIMEOUT", 32, "RPC recv timed out: $0"),
("RPC_GENERAL_ERROR", 30, "RPC Error: $0"),
("RPC_RECV_TIMEOUT", 31, "RPC recv timed out: $0"),
("UDF_VERIFY_FAILED", 33,
("UDF_VERIFY_FAILED", 32,
"Failed to verify function $0 from LLVM module $1, see log for more details."),
("PARQUET_CORRUPT_RLE_BYTES", 34, "File $0 corrupt. RLE level data bytes = $1"),
("PARQUET_CORRUPT_RLE_BYTES", 33, "File $0 corrupt. RLE level data bytes = $1"),
("AVRO_DECIMAL_RESOLUTION_ERROR", 35, "Column '$0' has conflicting Avro decimal types. "
("AVRO_DECIMAL_RESOLUTION_ERROR", 34, "Column '$0' has conflicting Avro decimal types. "
"Table schema $1: $2, file schema $1: $3"),
("AVRO_DECIMAL_METADATA_MISMATCH", 36, "Column '$0' has conflicting Avro decimal types. "
("AVRO_DECIMAL_METADATA_MISMATCH", 35, "Column '$0' has conflicting Avro decimal types. "
"Declared $1: $2, $1 in table's Avro schema: $3"),
("AVRO_SCHEMA_RESOLUTION_ERROR", 37, "Unresolvable types for column '$0': "
("AVRO_SCHEMA_RESOLUTION_ERROR", 36, "Unresolvable types for column '$0': "
"table type: $1, file type: $2"),
("AVRO_SCHEMA_METADATA_MISMATCH", 38, "Unresolvable types for column '$0': "
("AVRO_SCHEMA_METADATA_MISMATCH", 37, "Unresolvable types for column '$0': "
"declared column type: $1, table's Avro schema type: $2"),
("AVRO_UNSUPPORTED_DEFAULT_VALUE", 39, "Field $0 is missing from file and default "
("AVRO_UNSUPPORTED_DEFAULT_VALUE", 38, "Field $0 is missing from file and default "
"values of type $1 are not yet supported."),
("AVRO_MISSING_FIELD", 40, "Inconsistent table metadata. Mismatch between column "
("AVRO_MISSING_FIELD", 39, "Inconsistent table metadata. Mismatch between column "
"definition and Avro schema: cannot read field $0 because there are only $1 fields."),
("AVRO_MISSING_DEFAULT", 41,
("AVRO_MISSING_DEFAULT", 40,
"Field $0 is missing from file and does not have a default value."),
("AVRO_NULLABILITY_MISMATCH", 42,
("AVRO_NULLABILITY_MISMATCH", 41,
"Field $0 is nullable in the file schema but not the table schema."),
("AVRO_NOT_A_RECORD", 43,
("AVRO_NOT_A_RECORD", 42,
"Inconsistent table metadata. Field $0 is not a record in the Avro schema."),
("PARQUET_DEF_LEVEL_ERROR", 44, "Could not read definition level, even though metadata"
("PARQUET_DEF_LEVEL_ERROR", 43, "Could not read definition level, even though metadata"
" states there are $0 values remaining in data page. file=$1"),
("PARQUET_NUM_COL_VALS_ERROR", 45, "Mismatched number of values in column index $0 "
("PARQUET_NUM_COL_VALS_ERROR", 44, "Mismatched number of values in column index $0 "
"($1 vs. $2). file=$3"),
("PARQUET_DICT_DECODE_FAILURE", 46, "File '$0' is corrupt: error decoding "
("PARQUET_DICT_DECODE_FAILURE", 45, "File '$0' is corrupt: error decoding "
"dictionary-encoded value of type $1 at offset $2"),
("SSL_PASSWORD_CMD_FAILED", 47,
("SSL_PASSWORD_CMD_FAILED", 46,
"SSL private-key password command ('$0') failed with error: $1"),
("SSL_CERTIFICATE_PATH_BLANK", 48, "The SSL certificate path is blank"),
("SSL_PRIVATE_KEY_PATH_BLANK", 49, "The SSL private key path is blank"),
("SSL_CERTIFICATE_PATH_BLANK", 47, "The SSL certificate path is blank"),
("SSL_PRIVATE_KEY_PATH_BLANK", 48, "The SSL private key path is blank"),
("SSL_CERTIFICATE_NOT_FOUND", 50, "The SSL certificate file does not exist at path $0"),
("SSL_PRIVATE_KEY_NOT_FOUND", 51, "The SSL private key file does not exist at path $0"),
("SSL_CERTIFICATE_NOT_FOUND", 49, "The SSL certificate file does not exist at path $0"),
("SSL_PRIVATE_KEY_NOT_FOUND", 50, "The SSL private key file does not exist at path $0"),
("SSL_SOCKET_CREATION_FAILED", 52, "SSL socket creation failed: $0"),
("SSL_SOCKET_CREATION_FAILED", 51, "SSL socket creation failed: $0"),
("MEM_ALLOC_FAILED", 53, "Memory allocation of $0 bytes failed"),
("MEM_ALLOC_FAILED", 52, "Memory allocation of $0 bytes failed"),
("PARQUET_REP_LEVEL_ERROR", 54, "Could not read repetition level, even though metadata"
("PARQUET_REP_LEVEL_ERROR", 53, "Could not read repetition level, even though metadata"
" states there are $0 values remaining in data page. file=$1"),
("PARQUET_UNRECOGNIZED_SCHEMA", 55, "File '$0' has an incompatible Parquet schema for "
("PARQUET_UNRECOGNIZED_SCHEMA", 54, "File '$0' has an incompatible Parquet schema for "
"column '$1'. Column type: $2, Parquet schema:\\n$3"),
("COLLECTION_ALLOC_FAILED", 56, "Failed to allocate $0 bytes for collection '$1'.\\n"
("COLLECTION_ALLOC_FAILED", 55, "Failed to allocate $0 bytes for collection '$1'.\\n"
"Current buffer size: $2 num tuples: $3."),
("TMP_DEVICE_BLACKLISTED", 57,
("TMP_DEVICE_BLACKLISTED", 56,
"Temporary device for directory $0 is blacklisted from a previous error and cannot "
"be used."),
("TMP_FILE_BLACKLISTED", 58,
("TMP_FILE_BLACKLISTED", 57,
"Temporary file $0 is blacklisted from a previous error and cannot be expanded."),
("RPC_CLIENT_CONNECT_FAILURE", 59,
("RPC_CLIENT_CONNECT_FAILURE", 58,
"RPC client failed to connect: $0"),
("STALE_METADATA_FILE_TOO_SHORT", 60, "Metadata for file '$0' appears stale. "
("STALE_METADATA_FILE_TOO_SHORT", 59, "Metadata for file '$0' appears stale. "
"Try running \\\"refresh $1\\\" to reload the file metadata."),
("PARQUET_BAD_VERSION_NUMBER", 61, "File '$0' has an invalid version number: $1\\n"
("PARQUET_BAD_VERSION_NUMBER", 60, "File '$0' has an invalid version number: $1\\n"
"This could be due to stale metadata. Try running \\\"refresh $2\\\"."),
("SCANNER_INCOMPLETE_READ", 62, "Tried to read $0 bytes but could only read $1 bytes. "
("SCANNER_INCOMPLETE_READ", 61, "Tried to read $0 bytes but could only read $1 bytes. "
"This may indicate data file corruption. (file $2, byte offset: $3)"),
("SCANNER_INVALID_READ", 63, "Invalid read of $0 bytes. This may indicate data file "
("SCANNER_INVALID_READ", 62, "Invalid read of $0 bytes. This may indicate data file "
"corruption. (file $1, byte offset: $2)"),
("AVRO_BAD_VERSION_HEADER", 64, "File '$0' has an invalid version header: $1\\n"
("AVRO_BAD_VERSION_HEADER", 63, "File '$0' has an invalid version header: $1\\n"
"Make sure the file is an Avro data file."),
("UDF_MEM_LIMIT_EXCEEDED", 65, "$0's allocations exceeded memory limits."),
("UDF_MEM_LIMIT_EXCEEDED", 64, "$0's allocations exceeded memory limits."),
("BTS_BLOCK_OVERFLOW", 66, "Cannot process row that is bigger than the IO size "
("BTS_BLOCK_OVERFLOW", 65, "Cannot process row that is bigger than the IO size "
"(row_size=$0, null_indicators_size=$1). To run this query, increase the IO size "
"(--read_size option)."),
("COMPRESSED_FILE_MULTIPLE_BLOCKS", 67,
("COMPRESSED_FILE_MULTIPLE_BLOCKS", 66,
"For better performance, snappy-, gzip-, and bzip-compressed files "
"should not be split into multiple HDFS blocks. file=$0 offset $1"),
("COMPRESSED_FILE_BLOCK_CORRUPTED", 68,
("COMPRESSED_FILE_BLOCK_CORRUPTED", 67,
"$0 Data error, likely data corrupted in this block."),
("COMPRESSED_FILE_DECOMPRESSOR_ERROR", 69, "$0 Decompressor error at $1, code=$2"),
("COMPRESSED_FILE_DECOMPRESSOR_ERROR", 68, "$0 Decompressor error at $1, code=$2"),
("COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS", 70,
("COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS", 69,
"Decompression failed to make progress, but end of input is not reached. "
"File appears corrupted. file=$0"),
("COMPRESSED_FILE_TRUNCATED", 71,
("COMPRESSED_FILE_TRUNCATED", 70,
"Unexpected end of compressed file. File may be truncated. file=$0"),
("DATASTREAM_SENDER_TIMEOUT", 72, "Sender timed out waiting for receiver fragment "
("DATASTREAM_SENDER_TIMEOUT", 71, "Sender timed out waiting for receiver fragment "
"instance: $0"),
("KUDU_IMPALA_TYPE_MISSING", 73, "Kudu type $0 is not available in Impala."),
("KUDU_IMPALA_TYPE_MISSING", 72, "Kudu type $0 is not available in Impala."),
("IMPALA_KUDU_TYPE_MISSING", 74, "Impala type $0 is not available in Kudu."),
("IMPALA_KUDU_TYPE_MISSING", 73, "Impala type $0 is not available in Kudu."),
("KUDU_NOT_SUPPORTED_ON_OS", 75, "Kudu is not supported on this operating system."),
("KUDU_NOT_SUPPORTED_ON_OS", 74, "Kudu is not supported on this operating system."),
("KUDU_NOT_ENABLED", 76, "Kudu features are disabled by the startup flag "
("KUDU_NOT_ENABLED", 75, "Kudu features are disabled by the startup flag "
"--disable_kudu."),
("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 77, "Cannot perform hash join at node with "
("PARTITIONED_HASH_JOIN_REPARTITION_FAILS", 76, "Cannot perform hash join at node with "
"id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
"level $1. Number of rows $2."),
("PARTITIONED_AGG_REPARTITION_FAILS", 78, "Cannot perform aggregation at node with "
("PARTITIONED_AGG_REPARTITION_FAILS", 77, "Cannot perform aggregation at node with "
"id $0. Repartitioning did not reduce the size of a spilled partition. Repartitioning "
"level $1. Number of rows $2."),
("AVRO_TRUNCATED_BLOCK", 79, "File '$0' is corrupt: truncated data block at offset $1"),
("AVRO_TRUNCATED_BLOCK", 78, "File '$0' is corrupt: truncated data block at offset $1"),
("AVRO_INVALID_UNION", 80, "File '$0' is corrupt: invalid union value $1 at offset $2"),
("AVRO_INVALID_UNION", 79, "File '$0' is corrupt: invalid union value $1 at offset $2"),
("AVRO_INVALID_BOOLEAN", 81, "File '$0' is corrupt: invalid boolean value $1 at offset "
("AVRO_INVALID_BOOLEAN", 80, "File '$0' is corrupt: invalid boolean value $1 at offset "
"$2"),
("AVRO_INVALID_LENGTH", 82, "File '$0' is corrupt: invalid length $1 at offset $2"),
("AVRO_INVALID_LENGTH", 81, "File '$0' is corrupt: invalid length $1 at offset $2"),
("SCANNER_INVALID_INT", 83, "File '$0' is corrupt: invalid encoded integer at offset $1"),
("SCANNER_INVALID_INT", 82, "File '$0' is corrupt: invalid encoded integer at offset $1"),
("AVRO_INVALID_RECORD_COUNT", 84, "File '$0' is corrupt: invalid record count $1 at "
("AVRO_INVALID_RECORD_COUNT", 83, "File '$0' is corrupt: invalid record count $1 at "
"offset $2"),
("AVRO_INVALID_COMPRESSED_SIZE", 85, "File '$0' is corrupt: invalid compressed block "
("AVRO_INVALID_COMPRESSED_SIZE", 84, "File '$0' is corrupt: invalid compressed block "
"size $1 at offset $2"),
("AVRO_INVALID_METADATA_COUNT", 86, "File '$0' is corrupt: invalid metadata count $1 "
("AVRO_INVALID_METADATA_COUNT", 85, "File '$0' is corrupt: invalid metadata count $1 "
"at offset $2"),
("SCANNER_STRING_LENGTH_OVERFLOW", 87, "File '$0' could not be read: string $1 was "
("SCANNER_STRING_LENGTH_OVERFLOW", 86, "File '$0' could not be read: string $1 was "
"longer than supported limit of $2 bytes at offset $3"),
("PARQUET_CORRUPT_PLAIN_VALUE", 88, "File '$0' is corrupt: error decoding value of type "
("PARQUET_CORRUPT_PLAIN_VALUE", 87, "File '$0' is corrupt: error decoding value of type "
"$1 at offset $2"),
("PARQUET_CORRUPT_DICTIONARY", 89, "File '$0' is corrupt: error reading dictionary for "
("PARQUET_CORRUPT_DICTIONARY", 88, "File '$0' is corrupt: error reading dictionary for "
"data of type $1: $2"),
("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum "
("TEXT_PARSER_TRUNCATED_COLUMN", 89, "Length of column is $0 which exceeds maximum "
"supported length of 2147483647 bytes."),
("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query "
("SCRATCH_LIMIT_EXCEEDED", 90, "Scratch space limit of $0 bytes exceeded for query "
"while spilling data to disk."),
("BUFFER_ALLOCATION_FAILED", 92, "Unexpected error allocating $0 byte buffer."),
("BUFFER_ALLOCATION_FAILED", 91, "Unexpected error allocating $0 byte buffer."),
("PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE", 93, "File '$0' is corrupt: metadata indicates "
("PARQUET_ZERO_ROWS_IN_NON_EMPTY_FILE", 92, "File '$0' is corrupt: metadata indicates "
"a zero row count but there is at least one non-empty row group."),
("NO_REGISTERED_BACKENDS", 94, "Cannot schedule query: no registered backends "
("NO_REGISTERED_BACKENDS", 93, "Cannot schedule query: no registered backends "
"available."),
("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."),
("KUDU_KEY_ALREADY_PRESENT", 94, "Key already present in Kudu table '$0'."),
("KUDU_NOT_FOUND", 96, "Not found in Kudu table '$0': $1"),
("KUDU_NOT_FOUND", 95, "Not found in Kudu table '$0': $1"),
("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1"),
("KUDU_SESSION_ERROR", 96, "Error in Kudu table '$0': $1"),
("AVRO_UNSUPPORTED_TYPE", 98, "Column '$0': unsupported Avro type '$1'"),
("AVRO_UNSUPPORTED_TYPE", 97, "Column '$0': unsupported Avro type '$1'"),
("AVRO_INVALID_DECIMAL", 99,
("AVRO_INVALID_DECIMAL", 98,
"Column '$0': invalid Avro decimal type with precision = '$1' scale = '$2'"),
("KUDU_NULL_CONSTRAINT_VIOLATION", 100,
("KUDU_NULL_CONSTRAINT_VIOLATION", 99,
"Row with null value violates nullability constraint on table '$0'."),
("PARQUET_TIMESTAMP_OUT_OF_RANGE", 101,
("PARQUET_TIMESTAMP_OUT_OF_RANGE", 100,
"Parquet file '$0' column '$1' contains an out of range timestamp. "
"The valid date range is 1400-01-01..9999-12-31."),
)

View File

@@ -17,12 +17,16 @@
package org.apache.impala.planner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.SlotDescriptor;
@@ -35,6 +39,8 @@ import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.HdfsPartition.FileBlock;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.common.PrintUtils;
@@ -122,6 +128,14 @@ public class HdfsScanNode extends ScanNode {
// to values > 0 for hdfs text files.
private int skipHeaderLineCount_ = 0;
// Number of scan-ranges/files/partitions that have missing disk ids. Reported in the
// explain plan.
private int numScanRangesNoDiskIds_ = 0;
private int numFilesNoDiskIds_ = 0;
private int numPartitionsNoDiskIds_ = 0;
private static final Configuration CONF = new Configuration();
/**
* Construct a node to scan given data files into tuples described by 'desc',
* with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -308,7 +322,8 @@ public class HdfsScanNode extends ScanNode {
* ids, based on the given maximum number of bytes each scan range should scan.
* Returns the set of file formats being scanned.
*/
private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer) {
private Set<HdfsFileFormat> computeScanRangeLocations(Analyzer analyzer)
throws ImpalaRuntimeException {
long maxScanRangeLength = analyzer.getQueryCtx().client_request.getQuery_options()
.getMax_scan_range_length();
scanRanges_ = Lists.newArrayList();
@@ -316,7 +331,18 @@ public class HdfsScanNode extends ScanNode {
for (HdfsPartition partition: partitions_) {
fileFormats.add(partition.getFileFormat());
Preconditions.checkState(partition.getId() >= 0);
// Missing disk id accounting is only done for file systems that support the notion
// of disk/storage ids.
FileSystem partitionFs;
try {
partitionFs = partition.getLocationPath().getFileSystem(CONF);
} catch (IOException e) {
throw new ImpalaRuntimeException("Error determining partition fs type", e);
}
boolean checkMissingDiskIds = FileSystemUtil.supportsStorageIds(partitionFs);
boolean partitionMissingDiskIds = false;
for (HdfsPartition.FileDescriptor fileDesc: partition.getFileDescriptors()) {
boolean fileDescMissingDiskIds = false;
for (THdfsFileBlock thriftBlock: fileDesc.getFileBlocks()) {
HdfsPartition.FileBlock block = FileBlock.fromThrift(thriftBlock);
List<Integer> replicaHostIdxs = block.getReplicaHostIdxs();
@@ -337,6 +363,11 @@ public class HdfsScanNode extends ScanNode {
// Translate from network address to the global (to this request) host index.
Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
location.setHost_idx(globalHostIdx);
if (checkMissingDiskIds && block.getDiskId(i) == -1) {
++numScanRangesNoDiskIds_;
partitionMissingDiskIds = true;
fileDescMissingDiskIds = true;
}
location.setVolume_id(block.getDiskId(i));
location.setIs_cached(block.isCached(i));
locations.add(location);
@@ -362,7 +393,15 @@ public class HdfsScanNode extends ScanNode {
currentOffset += currentLength;
}
}
if (fileDescMissingDiskIds) {
++numFilesNoDiskIds_;
if (LOG.isTraceEnabled()) {
LOG.trace("File blocks mapping to unknown disk ids. Dir: " +
partition.getLocation() + " File:" + fileDesc.toString());
}
}
}
if (partitionMissingDiskIds) ++numPartitionsNoDiskIds_;
}
return fileFormats;
}
@@ -554,13 +593,13 @@ public class HdfsScanNode extends ScanNode {
HdfsTable table = (HdfsTable) desc_.getTable();
output.append(String.format("%s%s [%s", prefix, getDisplayLabel(),
getDisplayLabelDetail()));
int numPartitions = partitions_.size();
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal() &&
fragment_.isPartitioned()) {
output.append(", " + fragment_.getDataPartition().getExplainString());
}
output.append("]\n");
if (detailLevel.ordinal() >= TExplainLevel.STANDARD.ordinal()) {
int numPartitions = partitions_.size();
if (tbl_.getNumClusteringCols() == 0) numPartitions = 1;
output.append(String.format("%spartitions=%s/%s files=%s size=%s", detailPrefix,
numPartitions, table.getPartitions().size() - 1, totalFiles_,
@@ -586,6 +625,12 @@ public class HdfsScanNode extends ScanNode {
if (detailLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
output.append(getStatsExplainString(detailPrefix, detailLevel));
output.append("\n");
if (numScanRangesNoDiskIds_ > 0) {
output.append(String.format("%smissing disk ids: " +
"partitions=%s/%s files=%s/%s scan ranges %s/%s\n", detailPrefix,
numPartitionsNoDiskIds_, numPartitions, numFilesNoDiskIds_,
totalFiles_, numScanRangesNoDiskIds_, scanRanges_.size()));
}
}
return output.toString();
}
@@ -663,4 +708,6 @@ public class HdfsScanNode extends ScanNode {
@Override
public boolean hasCorruptTableStats() { return hasCorruptTableStats_; }
public boolean hasMissingDiskIds() { return numScanRangesNoDiskIds_ > 0; }
}

View File

@@ -304,6 +304,16 @@ public class Planner {
hasHeader = true;
}
if (request.query_ctx.isSetTables_missing_diskids()) {
List<String> tableNames = Lists.newArrayList();
for (TTableName tableName: request.query_ctx.getTables_missing_diskids()) {
tableNames.add(tableName.db_name + "." + tableName.table_name);
}
str.append("WARNING: The following tables have scan ranges with missing " +
"disk id information.\n" + Joiner.on(", ").join(tableNames) + "\n");
hasHeader = true;
}
if (request.query_ctx.isDisable_spilling()) {
str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" +
"Reason: Query option disable_unsafe_spills is set, at least one table\n" +

View File

@@ -86,6 +86,7 @@ import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.NotImplementedException;
import org.apache.impala.planner.HdfsScanNode;
import org.apache.impala.planner.PlanFragment;
import org.apache.impala.planner.Planner;
import org.apache.impala.planner.ScanNode;
@@ -947,14 +948,17 @@ public class Frontend {
LOG.trace("get scan range locations");
Set<TTableName> tablesMissingStats = Sets.newTreeSet();
Set<TTableName> tablesWithCorruptStats = Sets.newTreeSet();
Set<TTableName> tablesWithMissingDiskIds = Sets.newTreeSet();
for (ScanNode scanNode: scanNodes) {
result.putToPer_node_scan_ranges(
scanNode.getId().asInt(), scanNode.getScanRangeLocations());
if (scanNode.isTableMissingStats()) {
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
}
if (scanNode.hasCorruptTableStats()) {
tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift());
TTableName tableName = scanNode.getTupleDesc().getTableName().toThrift();
if (scanNode.isTableMissingStats()) tablesMissingStats.add(tableName);
if (scanNode.hasCorruptTableStats()) tablesWithCorruptStats.add(tableName);
if (scanNode instanceof HdfsScanNode &&
((HdfsScanNode) scanNode).hasMissingDiskIds()) {
tablesWithMissingDiskIds.add(tableName);
}
}
@@ -964,6 +968,9 @@ public class Frontend {
for (TTableName tableName: tablesWithCorruptStats) {
queryCtx.addToTables_with_corrupt_stats(tableName);
}
for (TTableName tableName: tablesWithMissingDiskIds) {
queryCtx.addToTables_missing_diskids(tableName);
}
// Compute resource requirements after scan range locations because the cost
// estimates of scan nodes rely on them.