diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 59d4ca277..1cfee5e54 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -697,9 +697,11 @@ void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type, const vector& compression_types, bool skipped) { scan_ranges_complete_counter()->Add(1); progress_.Update(1); + HdfsCompressionTypesSet compression_set; for (int i = 0; i < compression_types.size(); ++i) { - ++file_type_counts_[std::make_tuple(file_type, skipped, compression_types[i])]; + compression_set.AddType(compression_types[i]); } + ++file_type_counts_[std::make_tuple(file_type, skipped, compression_set)]; } void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector* order) const { @@ -786,19 +788,33 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { THdfsFileFormat::type file_format = std::get<0>(it->first); bool skipped = std::get<1>(it->first); - THdfsCompression::type compression_type = std::get<2>(it->first); + HdfsCompressionTypesSet compressions_set = std::get<2>(it->first); + int file_cnt = it->second; if (skipped) { if (file_format == THdfsFileFormat::PARQUET) { // If a scan range stored as parquet is skipped, its compression type // cannot be figured out without reading the data. - ss << file_format << "/" << "Unknown" << "(Skipped):" << it->second << " "; + ss << file_format << "/" << "Unknown" << "(Skipped):" << file_cnt << " "; } else { - ss << file_format << "/" << compression_type << "(Skipped):" - << it->second << " "; + ss << file_format << "/" << compressions_set.GetFirstType() << "(Skipped):" + << file_cnt << " "; } + } else if (compressions_set.Size() == 1) { + ss << file_format << "/" << compressions_set.GetFirstType() << ":" << file_cnt + << " "; } else { - ss << file_format << "/" << compression_type << ":" << it->second << " "; + ss << file_format << "/" << "("; + bool first = true; + for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) { + THdfsCompression::type type = static_cast( + elem.first); + if (!compressions_set.HasType(type)) continue; + if (!first) ss << ","; + ss << type; + first = false; + } + ss << "):" << file_cnt << " "; } } } diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index da95ecc4a..252964bfe 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -479,13 +479,6 @@ class HdfsScanNodeBase : public ScanNode { /// scanner threads. Status status_; - /// Mapping of file formats (file type, compression type) to the number of - /// splits of that type and the lock protecting it. - typedef std::map< - std::tuple, - int> FileTypeCountsMap; - FileTypeCountsMap file_type_counts_; - /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and /// issues initial ranges for all file types. Waits for runtime filters if necessary. /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true. @@ -525,6 +518,55 @@ class HdfsScanNodeBase : public ScanNode { /// Calls ExecNode::ExecDebugAction() with 'phase'. Returns the status based on the /// debug action specified for the query. Status ScanNodeDebugAction(TExecNodePhase::type phase) WARN_UNUSED_RESULT; + + private: + class HdfsCompressionTypesSet { + public: + HdfsCompressionTypesSet(): bit_map_(0) { + DCHECK_GE(sizeof(bit_map_) * CHAR_BIT, _THdfsCompression_VALUES_TO_NAMES.size()); + } + + bool HasType(THdfsCompression::type type) { + return (bit_map_ & (1 << type)) != 0; + } + + void AddType(const THdfsCompression::type type) { + bit_map_ |= (1 << type); + } + + int Size() { return BitUtil::Popcount(bit_map_); } + + THdfsCompression::type GetFirstType() { + DCHECK_GT(Size(), 0); + for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) { + THdfsCompression::type type = static_cast(elem.first); + if (HasType(type)) return type; + } + return THdfsCompression::NONE; + } + + // The following operator overloading is needed so this class can be part of the + // std::map key. + bool operator< (const HdfsCompressionTypesSet& o) const { + return bit_map_ < o.bit_map_; + } + + bool operator== (const HdfsCompressionTypesSet& o) const { + return bit_map_ == o.bit_map_; + } + + private: + uint32_t bit_map_; + }; + + /// Mapping of file formats to the number of splits of that type. The key is a tuple + /// containing: + /// * file type + /// * whether the split was skipped + /// * compression types set + typedef std::map, int> + FileTypeCountsMap; + FileTypeCountsMap file_type_counts_; }; } diff --git a/testdata/multi_compression_parquet_data/README b/testdata/multi_compression_parquet_data/README new file mode 100644 index 000000000..332f24df8 --- /dev/null +++ b/testdata/multi_compression_parquet_data/README @@ -0,0 +1,8 @@ +These Parquet files were created by modifying Impala's HdfsParquetTableWriter. + +String Data +----------- +These files have two string columns 'a' and 'b'. Each columns using different compression types. + +tinytable_0_gzip_snappy.parq: column 'a' is compressed by gzip, column 'b' is compressed by snappy +tinytable_1_gzip_snappy.parq: column 'a' is compressed by snappy, column 'b' is compressed by gzip diff --git a/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq b/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq new file mode 100644 index 000000000..4fd468bd8 Binary files /dev/null and b/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq differ diff --git a/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq b/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq new file mode 100644 index 000000000..e5d222056 Binary files /dev/null and b/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq differ diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs_parquet_scan_node_profile.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs_parquet_scan_node_profile.test new file mode 100644 index 000000000..7e55ab1ea --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs_parquet_scan_node_profile.test @@ -0,0 +1,20 @@ +# Regression test for IMPALA-5448 +# This query will do a full scan on a parquet file +select * from functional_parquet.alltypestiny where year=2009 and month=1 +---- RUNTIME_PROFILE +row_regex: .*File Formats: PARQUET/SNAPPY:1 +==== +---- QUERY +# This query will do a full scan on a parquet table with two partitions. +# Each partition uses different compression types. +select * from alltypes_multi_compression +---- RUNTIME_PROFILE +row_regex: .*File Formats: PARQUET/GZIP:1 PARQUET/SNAPPY:1 +==== +---- QUERY +# This query will do a full scan on a parquet table with multiple +# compression types +select * from multi_compression +---- RUNTIME_PROFILE +row_regex: .*File Formats: PARQUET/\(GZIP,SNAPPY\):2 +==== diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index f4f2fd6e7..ca7cd6b78 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -327,6 +327,39 @@ class TestParquet(ImpalaTestSuite): assert len(result.data) == 1 assert "4294967294" in result.data + def test_multi_compression_types(self, vector, unique_database): + """IMPALA-5448: Tests that parquet splits with multi compression types are counted + correctly. Cases tested: + - parquet file with columns using the same compression type + - parquet files using snappy and gzip compression types + """ + self.client.execute("create table %s.alltypes_multi_compression like" + " functional_parquet.alltypes" % unique_database) + hql_format = "set parquet.compression={codec};" \ + "insert into table %s.alltypes_multi_compression" \ + " partition (year = {year}, month = {month})" \ + " select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col," \ + " float_col, double_col,date_string_col,string_col,timestamp_col" \ + " from functional_parquet.alltypes" \ + " where year = {year} and month = {month}" % unique_database + check_call(['hive', '-e', hql_format.format(codec="snappy", year=2010, month=1)]) + check_call(['hive', '-e', hql_format.format(codec="gzip", year=2010, month=2)]) + + self.client.execute("create table %s.multi_compression (a string, b string)" + " stored as parquet" % unique_database) + multi_compression_tbl_loc =\ + get_fs_path("/test-warehouse/%s.db/%s" % (unique_database, "multi_compression")) + check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] + + "/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq", + multi_compression_tbl_loc]) + check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] + + "/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq", + multi_compression_tbl_loc]) + + vector.get_value('exec_option')['num_nodes'] = 1 + self.run_test_case('QueryTest/hdfs_parquet_scan_node_profile', + vector, unique_database) + def test_corrupt_rle_counts(self, vector, unique_database): """IMPALA-3646: Tests that a certain type of file corruption for plain dictionary encoded values is gracefully handled. Cases tested: