IMPALA-5448: fix invalid number of splits reported in Parquet scan node

Parquet splits with multi columns are marked as completed by using
HdfsScanNodeBase::RangeComplete(). It duplicately counts the file types
as column codec types. Thus the number of parquet splits are the real count
multiplies number of materialized columns.

Furthermore, according to the Parquet definition, it allows mixed compression
codecs on different columns. This's handled in this patch as well. A parquet file
using gzip and snappy compression codec will be reported as:
	FileFormats: PARQUET/(GZIP,SNAPPY):1

This patch introduces a compression types set for the above cases.

Testing:
Add end-to-end tests handling parquet files with all columns compressed in
snappy, and handling parquet files with multi compression codec.

Change-Id: Iaacc2d775032f5707061e704f12e0a63cde695d1
Reviewed-on: http://gerrit.cloudera.org:8080/8147
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins
This commit is contained in:
stiga-huang
2017-09-26 16:45:11 -07:00
committed by Impala Public Jenkins
parent adb92d3397
commit 192cd96d9e
7 changed files with 132 additions and 13 deletions

View File

@@ -697,9 +697,11 @@ void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type,
const vector<THdfsCompression::type>& compression_types, bool skipped) {
scan_ranges_complete_counter()->Add(1);
progress_.Update(1);
HdfsCompressionTypesSet compression_set;
for (int i = 0; i < compression_types.size(); ++i) {
++file_type_counts_[std::make_tuple(file_type, skipped, compression_types[i])];
compression_set.AddType(compression_types[i]);
}
++file_type_counts_[std::make_tuple(file_type, skipped, compression_set)];
}
void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order) const {
@@ -786,19 +788,33 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
THdfsFileFormat::type file_format = std::get<0>(it->first);
bool skipped = std::get<1>(it->first);
THdfsCompression::type compression_type = std::get<2>(it->first);
HdfsCompressionTypesSet compressions_set = std::get<2>(it->first);
int file_cnt = it->second;
if (skipped) {
if (file_format == THdfsFileFormat::PARQUET) {
// If a scan range stored as parquet is skipped, its compression type
// cannot be figured out without reading the data.
ss << file_format << "/" << "Unknown" << "(Skipped):" << it->second << " ";
ss << file_format << "/" << "Unknown" << "(Skipped):" << file_cnt << " ";
} else {
ss << file_format << "/" << compression_type << "(Skipped):"
<< it->second << " ";
ss << file_format << "/" << compressions_set.GetFirstType() << "(Skipped):"
<< file_cnt << " ";
}
} else if (compressions_set.Size() == 1) {
ss << file_format << "/" << compressions_set.GetFirstType() << ":" << file_cnt
<< " ";
} else {
ss << file_format << "/" << compression_type << ":" << it->second << " ";
ss << file_format << "/" << "(";
bool first = true;
for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) {
THdfsCompression::type type = static_cast<THdfsCompression::type>(
elem.first);
if (!compressions_set.HasType(type)) continue;
if (!first) ss << ",";
ss << type;
first = false;
}
ss << "):" << file_cnt << " ";
}
}
}

View File

@@ -479,13 +479,6 @@ class HdfsScanNodeBase : public ScanNode {
/// scanner threads.
Status status_;
/// Mapping of file formats (file type, compression type) to the number of
/// splits of that type and the lock protecting it.
typedef std::map<
std::tuple<THdfsFileFormat::type, bool, THdfsCompression::type>,
int> FileTypeCountsMap;
FileTypeCountsMap file_type_counts_;
/// Performs dynamic partition pruning, i.e., applies runtime filters to files, and
/// issues initial ranges for all file types. Waits for runtime filters if necessary.
/// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true.
@@ -525,6 +518,55 @@ class HdfsScanNodeBase : public ScanNode {
/// Calls ExecNode::ExecDebugAction() with 'phase'. Returns the status based on the
/// debug action specified for the query.
Status ScanNodeDebugAction(TExecNodePhase::type phase) WARN_UNUSED_RESULT;
private:
class HdfsCompressionTypesSet {
public:
HdfsCompressionTypesSet(): bit_map_(0) {
DCHECK_GE(sizeof(bit_map_) * CHAR_BIT, _THdfsCompression_VALUES_TO_NAMES.size());
}
bool HasType(THdfsCompression::type type) {
return (bit_map_ & (1 << type)) != 0;
}
void AddType(const THdfsCompression::type type) {
bit_map_ |= (1 << type);
}
int Size() { return BitUtil::Popcount(bit_map_); }
THdfsCompression::type GetFirstType() {
DCHECK_GT(Size(), 0);
for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) {
THdfsCompression::type type = static_cast<THdfsCompression::type>(elem.first);
if (HasType(type)) return type;
}
return THdfsCompression::NONE;
}
// The following operator overloading is needed so this class can be part of the
// std::map key.
bool operator< (const HdfsCompressionTypesSet& o) const {
return bit_map_ < o.bit_map_;
}
bool operator== (const HdfsCompressionTypesSet& o) const {
return bit_map_ == o.bit_map_;
}
private:
uint32_t bit_map_;
};
/// Mapping of file formats to the number of splits of that type. The key is a tuple
/// containing:
/// * file type
/// * whether the split was skipped
/// * compression types set
typedef std::map<std::tuple<THdfsFileFormat::type, bool, HdfsCompressionTypesSet>, int>
FileTypeCountsMap;
FileTypeCountsMap file_type_counts_;
};
}

View File

@@ -0,0 +1,8 @@
These Parquet files were created by modifying Impala's HdfsParquetTableWriter.
String Data
-----------
These files have two string columns 'a' and 'b'. Each columns using different compression types.
tinytable_0_gzip_snappy.parq: column 'a' is compressed by gzip, column 'b' is compressed by snappy
tinytable_1_gzip_snappy.parq: column 'a' is compressed by snappy, column 'b' is compressed by gzip

View File

@@ -0,0 +1,20 @@
# Regression test for IMPALA-5448
# This query will do a full scan on a parquet file
select * from functional_parquet.alltypestiny where year=2009 and month=1
---- RUNTIME_PROFILE
row_regex: .*File Formats: PARQUET/SNAPPY:1
====
---- QUERY
# This query will do a full scan on a parquet table with two partitions.
# Each partition uses different compression types.
select * from alltypes_multi_compression
---- RUNTIME_PROFILE
row_regex: .*File Formats: PARQUET/GZIP:1 PARQUET/SNAPPY:1
====
---- QUERY
# This query will do a full scan on a parquet table with multiple
# compression types
select * from multi_compression
---- RUNTIME_PROFILE
row_regex: .*File Formats: PARQUET/\(GZIP,SNAPPY\):2
====

View File

@@ -327,6 +327,39 @@ class TestParquet(ImpalaTestSuite):
assert len(result.data) == 1
assert "4294967294" in result.data
def test_multi_compression_types(self, vector, unique_database):
"""IMPALA-5448: Tests that parquet splits with multi compression types are counted
correctly. Cases tested:
- parquet file with columns using the same compression type
- parquet files using snappy and gzip compression types
"""
self.client.execute("create table %s.alltypes_multi_compression like"
" functional_parquet.alltypes" % unique_database)
hql_format = "set parquet.compression={codec};" \
"insert into table %s.alltypes_multi_compression" \
" partition (year = {year}, month = {month})" \
" select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col," \
" float_col, double_col,date_string_col,string_col,timestamp_col" \
" from functional_parquet.alltypes" \
" where year = {year} and month = {month}" % unique_database
check_call(['hive', '-e', hql_format.format(codec="snappy", year=2010, month=1)])
check_call(['hive', '-e', hql_format.format(codec="gzip", year=2010, month=2)])
self.client.execute("create table %s.multi_compression (a string, b string)"
" stored as parquet" % unique_database)
multi_compression_tbl_loc =\
get_fs_path("/test-warehouse/%s.db/%s" % (unique_database, "multi_compression"))
check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
"/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq",
multi_compression_tbl_loc])
check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
"/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq",
multi_compression_tbl_loc])
vector.get_value('exec_option')['num_nodes'] = 1
self.run_test_case('QueryTest/hdfs_parquet_scan_node_profile',
vector, unique_database)
def test_corrupt_rle_counts(self, vector, unique_database):
"""IMPALA-3646: Tests that a certain type of file corruption for plain
dictionary encoded values is gracefully handled. Cases tested: