IMPALA-2558: DCHECK in parquet scanner after block read error

There was an incorrect DCHECK in the parquet scanner. If abort_on_error
is false, the intended behaviour is to skip to the next row group, but
the DCHECK assumed that execution should have aborted if a parse error
was encountered.

This also:
- Fixes a DCHECK after an empty row group. InitColumns() would try to
  create empty scan ranges for the column readers.
- Uses metadata_range_->file() instead of stream_->filename() in the
  scanner. InitColumns() was using stream_->filename() in error
  messages, which used to work but now stream_ is set to NULL before
  calling InitColumns().

Change-Id: I8e29e4c0c268c119e1583f16bd6cf7cd59591701
Reviewed-on: http://gerrit.cloudera.org:8080/1257
Reviewed-by: Dan Hecht <dhecht@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Skye Wanderman-Milne
2015-10-29 18:07:33 -07:00
committed by Internal Jenkins
parent 19b6bf0201
commit dd2eb951d7
8 changed files with 188 additions and 78 deletions

View File

@@ -236,7 +236,7 @@ class HdfsParquetScanner::ColumnReader {
/// Returns true if this reader materializes collections (i.e. ArrayValues).
virtual bool IsCollectionReader() const { return false; }
virtual const char* filename() const = 0;
const char* filename() const { return parent_->filename(); };
/// Read the current value (or null) into 'tuple' for this column. This should only be
/// called when a value is defined, i.e., def_level() >=
@@ -365,13 +365,6 @@ class HdfsParquetScanner::CollectionColumnReader :
virtual bool IsCollectionReader() const { return true; }
virtual const char* filename() const {
// TODO: this won't be completely accurate if/when we support columns in separate
// files
DCHECK(!children_.empty());
return children_[0]->filename();
}
/// The repetition level indicating that the current value is the first in a new
/// collection (meaning the last value read was the final item in the previous
/// collection).
@@ -389,6 +382,13 @@ class HdfsParquetScanner::CollectionColumnReader :
/// reader's state.
virtual bool NextLevels();
/// This is called once for each row group in the file.
void Reset() {
def_level_ = -1;
rep_level_ = -1;
pos_current_value_ = -1;
}
private:
/// Column readers of fields contained within this collection. There is at least one
/// child reader per collection reader. Child readers either materialize slots in the
@@ -476,8 +476,6 @@ class HdfsParquetScanner::BaseScalarColumnReader :
}
MemPool* decompressed_data_pool() const { return decompressed_data_pool_.get(); }
virtual const char* filename() const { return stream_->filename(); }
/// Reads the next definition and repetition levels for this column. Initializes the
/// next data page if necessary.
virtual bool NextLevels() { return NextLevels<true>(); }
@@ -578,8 +576,7 @@ class HdfsParquetScanner::BaseScalarColumnReader :
// Pull out slow-path Status construction code from ReadRepetitionLevel()/
// ReadDefinitionLevel() for performance.
void __attribute__((noinline)) SetLevelError(TErrorCode::type error_code) {
parent_->parse_status_ =
Status(error_code, num_buffered_values_, stream_->filename());
parent_->parse_status_ = Status(error_code, num_buffered_values_, filename());
}
/// Writes the next value into *slot using pool if necessary. Also advances rep_level_
@@ -680,8 +677,8 @@ class HdfsParquetScanner::ScalarColumnReader :
if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
page_encoding_ != parquet::Encoding::PLAIN) {
stringstream ss;
ss << "File '" << parent_->metadata_range_->file() << "' is corrupt: unexpected "
<< "encoding: " << PrintEncoding(page_encoding_) << " for data page of column '"
ss << "File '" << filename() << "' is corrupt: unexpected encoding: "
<< PrintEncoding(page_encoding_) << " for data page of column '"
<< schema_element().name << "'.";
return Status(ss.str());
}
@@ -758,8 +755,7 @@ class HdfsParquetScanner::ScalarColumnReader :
/// Pull out slow-path Status construction code from ReadRepetitionLevel()/
/// ReadDefinitionLevel() for performance.
void __attribute__((noinline)) SetDictDecodeError() {
parent_->parse_status_ =
Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, stream_->filename());
parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename());
}
/// Dictionary decoder for decoding column values.
@@ -908,6 +904,7 @@ class HdfsParquetScanner::BoolColumnReader :
Status HdfsParquetScanner::Prepare(ScannerContext* context) {
RETURN_IF_ERROR(HdfsScanner::Prepare(context));
metadata_range_ = stream_->scan_range();
num_cols_counter_ =
ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT);
num_row_groups_counter_ =
@@ -1060,8 +1057,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
} else if (num_values_read_ > metadata_->num_values) {
// The data pages contain more values than stated in the column metadata.
return Status(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
metadata_->num_values, num_values_read_, node_.element->name,
stream_->filename());
metadata_->num_values, num_values_read_, node_.element->name, filename());
}
int64_t buffer_size;
@@ -1072,7 +1068,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
DCHECK_LT(num_values_read_, metadata_->num_values);
// TODO for 2.3: node_.element->name isn't necessarily useful
return Status(TErrorCode::PARQUET_COLUMN_METADATA_INVALID, metadata_->num_values,
num_values_read_, node_.element->name, stream_->filename());
num_values_read_, node_.element->name, filename());
}
// We don't know the actual header size until the thrift object is deserialized. Loop
@@ -1106,7 +1102,7 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
if (buffer_size == new_buffer_size) {
DCHECK_NE(new_buffer_size, 0);
return Status(TErrorCode::PARQUET_HEADER_EOF, stream_->filename());
return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
}
DCHECK_GT(new_buffer_size, buffer_size);
buffer_size = new_buffer_size;
@@ -1207,14 +1203,14 @@ Status HdfsParquetScanner::BaseScalarColumnReader::ReadDataPage() {
if (max_rep_level() > 0) {
// Initialize the repetition level data
rep_levels_.Init(stream_->filename(),
rep_levels_.Init(filename(),
current_page_header_.data_page_header.repetition_level_encoding,
max_rep_level(), num_buffered_values_, &data_, &data_size);
}
if (max_def_level() > 0) {
// Initialize the definition level data
def_levels_.Init(stream_->filename(),
def_levels_.Init(filename(),
current_page_header_.data_page_header.definition_level_encoding,
max_def_level(), num_buffered_values_, &data_, &data_size);
}
@@ -1423,7 +1419,7 @@ void HdfsParquetScanner::CollectionColumnReader::UpdateDerivedState() {
}
Status HdfsParquetScanner::ValidateColumnOffsets(const parquet::RowGroup& row_group) {
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(metadata_range_->file());
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
for (int i = 0; i < row_group.columns.size(); ++i) {
const parquet::ColumnChunk& col_chunk = row_group.columns[i];
int64_t col_start = col_chunk.meta_data.data_page_offset;
@@ -1477,7 +1473,6 @@ Status HdfsParquetScanner::ProcessSplit() {
DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
// First process the file metadata in the footer
bool eosr;
metadata_range_ = stream_->scan_range();
RETURN_IF_ERROR(ProcessFooter(&eosr));
if (eosr) return Status::OK();
@@ -1492,11 +1487,14 @@ Status HdfsParquetScanner::ProcessSplit() {
// Iterate through each row group in the file and process any row groups that fall
// within this split.
for (int i = 0; i < file_metadata_.row_groups.size(); ++i) {
const parquet::RowGroup& row_group = file_metadata_.row_groups[i];
if (row_group.num_rows == 0) continue;
const DiskIoMgr::ScanRange* split_range =
reinterpret_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
RETURN_IF_ERROR(ValidateColumnOffsets(file_metadata_.row_groups[i]));
RETURN_IF_ERROR(ValidateColumnOffsets(row_group));
int64_t row_group_mid_pos = GetRowGroupMidOffset(file_metadata_.row_groups[i]);
int64_t row_group_mid_pos = GetRowGroupMidOffset(row_group);
int64_t split_offset = split_range->offset();
int64_t split_length = split_range->len();
if (!(row_group_mid_pos >= split_offset &&
@@ -1543,9 +1541,9 @@ Status HdfsParquetScanner::ProcessSplit() {
if (context_->cancelled()) return Status::OK();
RETURN_IF_ERROR(state_->CheckQueryState());
DCHECK(continue_execution);
// We should be at the end of the the row group if we get this far
DCHECK_EQ(column_readers_[0]->rep_level(), -1);
DCHECK(continue_execution || !state_->abort_on_error());
// We should be at the end of the row group if we get this far with no parse error
if (parse_status_.ok()) DCHECK_EQ(column_readers_[0]->rep_level(), -1);
// Reset parse_status_ for the next row group.
parse_status_ = Status::OK();
}
@@ -1704,12 +1702,12 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
if (!success) {
DCHECK(!parse_status_.ok());
if (parse_status_.code() == TErrorCode::SCANNER_INCOMPLETE_READ) {
VLOG_QUERY << "Metadata for file '" << stream_->filename() << "' appears stale: "
VLOG_QUERY << "Metadata for file '" << filename() << "' appears stale: "
<< "metadata states file size to be "
<< PrettyPrinter::Print(stream_->file_desc()->file_length, TUnit::BYTES)
<< ", but could only read "
<< PrettyPrinter::Print(stream_->total_bytes_returned(), TUnit::BYTES);
return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, stream_->filename(),
return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, filename(),
scan_node_->hdfs_table()->fully_qualified_name());
}
return parse_status_;
@@ -1721,15 +1719,14 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
// Make sure footer has enough bytes to contain the required information.
if (remaining_bytes_buffered < 0) {
return Status(Substitute("File '$0' is invalid. Missing metadata.",
stream_->filename()));
return Status(Substitute("File '$0' is invalid. Missing metadata.", filename()));
}
// Validate magic file bytes are correct.
uint8_t* magic_number_ptr = buffer + len - sizeof(PARQUET_VERSION_NUMBER);
if (memcmp(magic_number_ptr, PARQUET_VERSION_NUMBER,
sizeof(PARQUET_VERSION_NUMBER)) != 0) {
return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, stream_->filename(),
return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, filename(),
string(reinterpret_cast<char*>(magic_number_ptr), sizeof(PARQUET_VERSION_NUMBER)),
scan_node_->hdfs_table()->fully_qualified_name());
}
@@ -1748,7 +1745,7 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
// In this case, the metadata is bigger than our guess meaning there are
// not enough bytes in the footer range from IssueInitialRanges().
// We'll just issue more ranges to the IoMgr that is the actual footer.
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(metadata_range_->file());
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
DCHECK(file_desc != NULL);
// The start of the metadata is:
// file_length - 4-byte metadata size - footer-size - metadata size
@@ -1757,7 +1754,7 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
int64_t metadata_bytes_to_read = metadata_size;
if (metadata_start < 0) {
return Status(Substitute("File $0 is invalid. Invalid metadata size in file "
"footer: $1 bytes. File size: $2 bytes.", stream_->filename(), metadata_size,
"footer: $1 bytes. File size: $2 bytes.", filename(), metadata_size,
file_desc->file_length));
}
// IoMgr can only do a fixed size Read(). The metadata could be larger
@@ -1774,10 +1771,9 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
int64_t to_read = ::min(static_cast<int64_t>(io_mgr->max_read_buffer_size()),
metadata_bytes_to_read);
DiskIoMgr::ScanRange* range = scan_node_->AllocateScanRange(
metadata_range_->fs(), metadata_range_->file(), to_read,
metadata_start + copy_offset, -1, metadata_range_->disk_id(),
metadata_range_->try_cache(), metadata_range_->expected_local(),
file_desc->mtime);
metadata_range_->fs(), filename(), to_read, metadata_start + copy_offset, -1,
metadata_range_->disk_id(), metadata_range_->try_cache(),
metadata_range_->expected_local(), file_desc->mtime);
DiskIoMgr::BufferDescriptor* io_buffer = NULL;
RETURN_IF_ERROR(io_mgr->Read(scan_node_->reader_context(), range, &io_buffer));
@@ -1796,7 +1792,7 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
DeserializeThriftMsg(metadata_ptr, &metadata_size, true, &file_metadata_);
if (!status.ok()) {
return Status(Substitute("File $0 has invalid file metadata at file offset $1. "
"Error = $2.", stream_->filename(),
"Error = $2.", filename(),
metadata_size + sizeof(PARQUET_VERSION_NUMBER) + sizeof(uint32_t),
status.GetDetail()));
}
@@ -1832,8 +1828,11 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) {
}
if (file_metadata_.row_groups.empty()) {
return Status(Substitute("Invalid file. This file: $0 has no row groups",
stream_->filename()));
return Status(
Substitute("Invalid file. This file: $0 has no row groups", filename()));
}
if (schema_.children.empty()) {
return Status(Substitute("Invalid file: '$0' has no columns.", filename()));
}
return Status::OK();
}
@@ -1941,7 +1940,7 @@ HdfsParquetScanner::SchemaNode* HdfsParquetScanner::NextSchemaNode(const SchemaP
if (node->children.size() <= file_idx) {
// The selected field is not in the file
VLOG_FILE << Substitute(
"File '$0' does not contain path '$1'", stream_->filename(), PrintPath(path));
"File '$0' does not contain path '$1'", filename(), PrintPath(path));
*missing_field = true;
return NULL;
}
@@ -1991,7 +1990,7 @@ Status HdfsParquetScanner::ResolveArray(ArrayEncoding array_encoding,
bool* missing_field) {
if (array_encoding == ONE_LEVEL) {
if (!(*node)->is_repeated()) {
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, metadata_range_->file(),
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
PrintPath(path, idx), "array", (*node)->DebugString());
return Status::Expected(msg);
}
@@ -1999,7 +1998,7 @@ Status HdfsParquetScanner::ResolveArray(ArrayEncoding array_encoding,
// In the multi-level case, we always expect the outer group to contain a single
// repeated field
if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated()) {
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, metadata_range_->file(),
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
PrintPath(path, idx), "array", (*node)->DebugString());
return Status::Expected(msg);
}
@@ -2038,7 +2037,7 @@ Status HdfsParquetScanner::ResolveMap(const SchemaPath& path, int idx, SchemaNod
bool* missing_field) {
if ((*node)->children.size() != 1 || !(*node)->children[0].is_repeated() ||
(*node)->children[0].children.size() != 2) {
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, metadata_range_->file(),
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
PrintPath(path, idx), "map", (*node)->DebugString());
return Status::Expected(msg);
}
@@ -2053,13 +2052,13 @@ Status HdfsParquetScanner::ResolveMap(const SchemaPath& path, int idx, SchemaNod
Status HdfsParquetScanner::ValidateScalarNode(const SchemaNode& node,
const ColumnType& col_type, const SchemaPath& path, int idx) {
if (!node.children.empty()) {
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, metadata_range_->file(),
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
PrintPath(path, idx), col_type.DebugString(), node.DebugString());
return Status::Expected(msg);
}
parquet::Type::type type = IMPALA_TO_PARQUET_TYPES[col_type.type];
if (type != node.element->type) {
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, metadata_range_->file(),
ErrorMsg msg(TErrorCode::PARQUET_UNRECOGNIZED_SCHEMA, filename(),
PrintPath(path, idx), col_type.DebugString(), node.DebugString());
return Status::Expected(msg);
}
@@ -2149,7 +2148,7 @@ Status HdfsParquetScanner::CreateCountingReader(
if (missing_field) {
// TODO: can we do anything else here?
return Status(Substitute(
"Could not find '$0' in file.", PrintPath(parent_path), stream_->filename()));
"Could not find '$0' in file.", PrintPath(parent_path), filename()));
}
DCHECK(!pos_field);
DCHECK(parent_path.empty() || parent_node->is_repeated());
@@ -2197,7 +2196,7 @@ Status HdfsParquetScanner::CreateCountingReader(
Status HdfsParquetScanner::InitColumns(
int row_group_idx, const vector<ColumnReader*>& column_readers) {
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(metadata_range_->file());
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(filename());
DCHECK(file_desc != NULL);
parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
@@ -2211,9 +2210,10 @@ Status HdfsParquetScanner::InitColumns(
BOOST_FOREACH(ColumnReader* col_reader, column_readers) {
if (col_reader->IsCollectionReader()) {
// Recursively init child readers
CollectionColumnReader* collection_reader =
static_cast<CollectionColumnReader*>(col_reader);
collection_reader->Reset();
// Recursively init child readers
RETURN_IF_ERROR(InitColumns(row_group_idx, *collection_reader->children()));
continue;
}
@@ -2230,7 +2230,7 @@ Status HdfsParquetScanner::InitColumns(
// TODO for 2.3: improve this error message by saying which columns are different,
// and also specify column in other error messages as appropriate
return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
col_chunk.meta_data.num_values, num_values, stream_->filename());
col_chunk.meta_data.num_values, num_values, filename());
}
RETURN_IF_ERROR(ValidateColumn(*scalar_reader, row_group_idx));
@@ -2241,6 +2241,10 @@ Status HdfsParquetScanner::InitColumns(
col_start = col_chunk.meta_data.dictionary_page_offset;
}
int64_t col_len = col_chunk.meta_data.total_compressed_size;
if (col_len <= 0) {
return Status(Substitute("File '$0' contains invalid column chunk size: $1",
filename(), col_len));
}
int64_t col_end = col_start + col_len;
// Already validated in ValidateColumnOffsets()
@@ -2257,7 +2261,7 @@ Status HdfsParquetScanner::InitColumns(
// TODO: this will need to change when we have co-located files and the columns
// are different files.
if (!col_chunk.file_path.empty()) {
FILE_CHECK_EQ(col_chunk.file_path, string(metadata_range_->file()));
FILE_CHECK_EQ(col_chunk.file_path, string(filename()));
}
const DiskIoMgr::ScanRange* split_range =
@@ -2269,9 +2273,9 @@ Status HdfsParquetScanner::InitColumns(
col_end <= split_range->offset() + split_range->len();
DiskIoMgr::ScanRange* col_range = scan_node_->AllocateScanRange(
metadata_range_->fs(), metadata_range_->file(), col_len, col_start,
scalar_reader->col_idx(), split_range->disk_id(), split_range->try_cache(),
column_range_local, file_desc->mtime);
metadata_range_->fs(), filename(), col_len, col_start, scalar_reader->col_idx(),
split_range->disk_id(), split_range->try_cache(), column_range_local,
file_desc->mtime);
col_ranges.push_back(col_range);
// Get the stream that will be used for this column
@@ -2313,7 +2317,7 @@ Status HdfsParquetScanner::CreateSchemaTree(
const {
if (*idx >= schema.size()) {
return Status(Substitute("File $0 corrupt: could not reconstruct schema tree from "
"flattened schema in file metadata", stream_->filename()));
"flattened schema in file metadata", filename()));
}
node->element = &schema[*idx];
++(*idx);
@@ -2401,7 +2405,7 @@ bool HdfsParquetScanner::FileVersion::VersionEq(int major, int minor, int patch)
Status HdfsParquetScanner::ValidateFileMetadata() {
if (file_metadata_.version > PARQUET_CURRENT_VERSION) {
stringstream ss;
ss << "File: " << stream_->filename() << " is of an unsupported version. "
ss << "File: " << filename() << " is of an unsupported version. "
<< "file version: " << file_metadata_.version;
return Status(ss.str());
}
@@ -2437,7 +2441,7 @@ Status HdfsParquetScanner::ValidateColumn(
for (int i = 0; i < encodings.size(); ++i) {
if (!IsEncodingSupported(encodings[i])) {
stringstream ss;
ss << "File '" << metadata_range_->file() << "' uses an unsupported encoding: "
ss << "File '" << filename() << "' uses an unsupported encoding: "
<< PrintEncoding(encodings[i]) << " for column '" << schema_element.name
<< "'.";
return Status(ss.str());
@@ -2449,7 +2453,7 @@ Status HdfsParquetScanner::ValidateColumn(
file_data.meta_data.codec != parquet::CompressionCodec::SNAPPY &&
file_data.meta_data.codec != parquet::CompressionCodec::GZIP) {
stringstream ss;
ss << "File '" << metadata_range_->file() << "' uses an unsupported compression: "
ss << "File '" << filename() << "' uses an unsupported compression: "
<< file_data.meta_data.codec << " for column '" << schema_element.name
<< "'.";
return Status(ss.str());
@@ -2473,14 +2477,14 @@ Status HdfsParquetScanner::ValidateColumn(
// We require that the scale and byte length be set.
if (schema_element.type != parquet::Type::FIXED_LEN_BYTE_ARRAY) {
stringstream ss;
ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
ss << "File '" << filename() << "' column '" << schema_element.name
<< "' should be a decimal column encoded using FIXED_LEN_BYTE_ARRAY.";
return Status(ss.str());
}
if (!schema_element.__isset.type_length) {
stringstream ss;
ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
ss << "File '" << filename() << "' column '" << schema_element.name
<< "' does not have type_length set.";
return Status(ss.str());
}
@@ -2488,7 +2492,7 @@ Status HdfsParquetScanner::ValidateColumn(
int expected_len = ParquetPlainEncoder::DecimalSize(slot_desc->type());
if (schema_element.type_length != expected_len) {
stringstream ss;
ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
ss << "File '" << filename() << "' column '" << schema_element.name
<< "' has an invalid type length. Expecting: " << expected_len
<< " len in file: " << schema_element.type_length;
return Status(ss.str());
@@ -2496,7 +2500,7 @@ Status HdfsParquetScanner::ValidateColumn(
if (!schema_element.__isset.scale) {
stringstream ss;
ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
ss << "File '" << filename() << "' column '" << schema_element.name
<< "' does not have the scale set.";
return Status(ss.str());
}
@@ -2504,7 +2508,7 @@ Status HdfsParquetScanner::ValidateColumn(
if (schema_element.scale != slot_desc->type().scale) {
// TODO: we could allow a mismatch and do a conversion at this step.
stringstream ss;
ss << "File '" << metadata_range_->file() << "' column '" << schema_element.name
ss << "File '" << filename() << "' column '" << schema_element.name
<< "' has a scale that does not match the table metadata scale."
<< " File metadata scale: " << schema_element.scale
<< " Table metadata scale: " << slot_desc->type().scale;
@@ -2513,14 +2517,13 @@ Status HdfsParquetScanner::ValidateColumn(
// The other decimal metadata should be there but we don't need it.
if (!schema_element.__isset.precision) {
ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION,
metadata_range_->file(), schema_element.name);
ErrorMsg msg(TErrorCode::PARQUET_MISSING_PRECISION, filename(),
schema_element.name);
LOG_OR_RETURN_ON_ERROR(msg, state_);
} else {
if (schema_element.precision != slot_desc->type().precision) {
// TODO: we could allow a mismatch and do a conversion at this step.
ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION,
metadata_range_->file(), schema_element.name,
ErrorMsg msg(TErrorCode::PARQUET_WRONG_PRECISION, filename(), schema_element.name,
schema_element.precision, slot_desc->type().precision);
LOG_OR_RETURN_ON_ERROR(msg, state_);
}
@@ -2529,14 +2532,14 @@ Status HdfsParquetScanner::ValidateColumn(
if (!is_converted_type_decimal) {
// TODO: is this validation useful? It is not required at all to read the data and
// might only serve to reject otherwise perfectly readable files.
ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE,
metadata_range_->file(), schema_element.name);
ErrorMsg msg(TErrorCode::PARQUET_BAD_CONVERTED_TYPE, filename(),
schema_element.name);
LOG_OR_RETURN_ON_ERROR(msg, state_);
}
} else if (schema_element.__isset.scale || schema_element.__isset.precision ||
is_converted_type_decimal) {
ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL,
metadata_range_->file(), schema_element.name, slot_desc->type().DebugString());
ErrorMsg msg(TErrorCode::PARQUET_INCOMPATIBLE_DECIMAL, filename(),
schema_element.name, slot_desc->type().DebugString());
LOG_OR_RETURN_ON_ERROR(msg, state_);
}
return Status::OK();
@@ -2554,9 +2557,8 @@ Status HdfsParquetScanner::ValidateEndOfRowGroup(
// rows read from the file.
int64_t expected_rows_in_group = file_metadata_.row_groups[row_group_idx].num_rows;
if (rows_read != expected_rows_in_group) {
HdfsParquetScanner::ColumnReader* reader = column_readers_[0];
return Status(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR, reader->filename(),
row_group_idx, expected_rows_in_group, rows_read);
return Status(TErrorCode::PARQUET_GROUP_ROW_COUNT_ERROR, filename(), row_group_idx,
expected_rows_in_group, rows_read);
}
}

View File

@@ -398,6 +398,8 @@ class HdfsParquetScanner : public HdfsScanner {
/// Number of row groups that need to be read.
RuntimeProfile::Counter* num_row_groups_counter_;
const char* filename() const { return metadata_range_->file(); }
/// Reads data using 'column_readers' to materialize instances of 'tuple_desc'
/// (including recursively reading collections).
///

16
testdata/data/README vendored
View File

@@ -22,3 +22,19 @@ hive> INSERT INTO TABLE tbl
alltypesagg_hive_13_1.parquet:
Generated with parquet-mr version 1.5.0-cdh5.4.0-SNAPSHOT
hive> create table alltypesagg_hive_13_1 stored as parquet as select * from alltypesagg;
bad_column_metadata.parquet:
Generated with hacked version of parquet-mr 1.8.2-SNAPSHOT
Schema:
{"type": "record",
"namespace": "com.cloudera.impala",
"name": "bad_column_metadata",
"fields": [
{"name": "id", "type": ["null", "long"]},
{"name": "int_array", "type": ["null", {"type": "array", "items": ["null", "int"]}]}
]
}
Contains 3 row groups, each with ten rows and each array containing ten elements. The
first rowgroup column metadata for 'int_array' incorrectly states there are 50 values
(instead of 100), and the second rowgroup column metadata for 'id' incorrectly states
there are 11 values (instead of 10). The third rowgroup has the correct metadata.

Binary file not shown.

View File

@@ -1418,6 +1418,19 @@ hadoop fs -put -f ${IMPALA_HOME}/testdata/data/kite_required_fields.parquet \
/test-warehouse/kite_required_fields_parquet/
====
---- DATASET
-- Parquet file with incorrect column metadata in multiple row groups
functional
---- BASE_TABLE_NAME
bad_column_metadata
---- COLUMNS
id bigint
int_array array<int>
---- LOAD
`hadoop fs -mkdir -p /test-warehouse/bad_column_metadata_parquet && \
hadoop fs -put -f ${IMPALA_HOME}/testdata/data/bad_column_metadata.parquet \
/test-warehouse/bad_column_metadata_parquet
====
---- DATASET
functional
---- BASE_TABLE_NAME
bad_serde

View File

@@ -42,6 +42,7 @@ table_name:bad_dict_page_offset, constraint:restrict_to, table_format:parquet/no
table_name:bad_compressed_size, constraint:restrict_to, table_format:parquet/none/none
table_name:alltypesagg_hive_13_1, constraint:restrict_to, table_format:parquet/none/none
table_name:kite_required_fields, constraint:restrict_to, table_format:parquet/none/none
table_name:bad_column_metadata, constraint:restrict_to, table_format:parquet/none/none
table_name:lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none
# TODO: Support Avro. Data loading currently fails for Avro because complex types
1 # Table level constraints:
42 # cannot be converted to the corresponding Avro types yet. # TODO: Support Avro. Data loading currently fails for Avro because complex types
43 table_name:allcomplextypes, constraint:restrict_to, table_format:text/none/none # cannot be converted to the corresponding Avro types yet.
44 table_name:allcomplextypes, constraint:restrict_to, table_format:parquet/none/none table_name:allcomplextypes, constraint:restrict_to, table_format:text/none/none
45 table_name:allcomplextypes, constraint:restrict_to, table_format:parquet/none/none
46 table_name:allcomplextypes, constraint:restrict_to, table_format:hbase/none/none
47 table_name:functional, constraint:restrict_to, table_format:text/none/none
48 table_name:complextypes_fileformat, constraint:restrict_to, table_format:text/none/none

View File

@@ -0,0 +1,72 @@
====
---- QUERY
# IMPALA-2558: trigger bad parse_status_ in HdfsParquetScanner::AssembleRows()
select id, cnt from bad_column_metadata t, (select count(*) cnt from t.int_array) v
---- TYPES
bigint,bigint
---- RESULTS
1,10
2,10
3,10
4,10
5,10
6,10
7,10
8,10
9,10
11,10
12,10
13,10
14,10
15,10
16,10
17,10
18,10
19,10
21,10
22,10
23,10
24,10
25,10
26,10
27,10
28,10
29,10
30,10
====
---- QUERY
# IMPALA-2558
select id from bad_column_metadata
---- TYPES
bigint
---- RESULTS
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
21
22
23
24
25
26
27
28
29
30
====

View File

@@ -200,9 +200,13 @@ class TestParquet(ImpalaTestSuite):
def test_parquet(self, vector):
self.run_test_case('QueryTest/parquet', vector)
def test_continue_on_error(self, vector):
vector.get_value('exec_option')['abort_on_error'] = 0
self.run_test_case('QueryTest/parquet-continue-on-error', vector)
@SkipIfS3.hdfs_block_size
@SkipIfIsilon.hdfs_block_size
def test_verify_runtime_profile(self, vector):
def test_multiple_blocks(self, vector):
# For IMPALA-1881. The table functional_parquet.lineitem_multiblock has 3 blocks, so
# we verify if each impalad reads one block by checking if each impalad reads at
# least one row group.