IMPALA-5061: Populate null_count in parquet::statistics

The null_count in the statistics field is updated each time a null
value is encountered by parquet table writer. The value is written
to the parquet header if it has one or more null values in the
row_group.

Testing: Modified the existing end-to-end test in the
test_insert_parquet.py file to make sure each parquet header has
the appropriate null_count. Verified the correctness of the nulltable
test and added an additional test which populates a parquet file with
the functional_parquet.zipcode_incomes table and ensures that the
expected null_count is populated.

Change-Id: I4c49a63af84c2234f0633be63206cb52eb7e8ebb
Reviewed-on: http://gerrit.cloudera.org:8080/7058
Reviewed-by: Lars Volker <lv@cloudera.com>
Tested-by: Impala Public Jenkins
This commit is contained in:
poojanilangekar
2017-06-02 10:29:13 -07:00
committed by Impala Public Jenkins
parent 567814b4c9
commit 6d5cd6174e
4 changed files with 134 additions and 126 deletions

View File

@@ -141,8 +141,7 @@ class HdfsParquetTableWriter::BaseColumnWriter {
// 'meta_data'.
void EncodeRowGroupStats(ColumnMetaData* meta_data) {
DCHECK(row_group_stats_base_ != nullptr);
if (row_group_stats_base_->has_values()
&& row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
if (row_group_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
row_group_stats_base_->EncodeToThrift(&meta_data->statistics);
meta_data->__isset.statistics = true;
}
@@ -466,8 +465,12 @@ inline Status HdfsParquetTableWriter::BaseColumnWriter::AppendRow(TupleRow* row)
// TODO: Have a clearer set of state transitions here, to make it easier to see that
// this won't loop forever.
while (true) {
// Nulls don't get encoded.
if (value == nullptr) break;
// Nulls don't get encoded. Increment the null count of the parquet statistics.
if (value == nullptr) {
DCHECK(page_stats_base_ != nullptr);
page_stats_base_->IncrementNullCount(1);
break;
}
int64_t bytes_needed = 0;
if (ProcessValue(value, &bytes_needed)) {
@@ -679,8 +682,7 @@ void HdfsParquetTableWriter::BaseColumnWriter::FinalizeCurrentPage() {
// Build page statistics and add them to the header.
DCHECK(page_stats_base_ != nullptr);
if (page_stats_base_->has_values()
&& page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
if (page_stats_base_->BytesNeeded() <= MAX_COLUMN_STATS_SIZE) {
page_stats_base_->EncodeToThrift(&header.data_page_header.statistics);
header.data_page_header.__isset.statistics = true;
}

View File

@@ -62,7 +62,7 @@ class ColumnStatsBase {
/// the minimum or maximum value.
enum class StatsField { MIN, MAX };
ColumnStatsBase() : has_values_(false) {}
ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
virtual ~ColumnStatsBase() {}
/// Decodes the parquet::Statistics from 'col_chunk' and writes the value selected by
@@ -91,17 +91,22 @@ class ColumnStatsBase {
virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
/// Resets the state of this object.
void Reset() { has_values_ = false; }
void Reset();
bool has_values() const { return has_values_; }
/// Update the statistics by incrementing the null_count. It is called each time a null
/// value is appended to the column or the statistics are merged.
void IncrementNullCount(int64_t count) { null_count_ += count; }
protected:
// Copies the memory of 'value' into 'buffer' and make 'value' point to 'buffer'.
// 'buffer' is reset before making the copy.
static void CopyToBuffer(StringBuffer* buffer, StringValue* value);
/// Stores whether the current object has been initialized with a set of values.
bool has_values_;
/// Stores whether the min and max values of the current object have been initialized.
bool has_min_max_values_;
// Number of null values since the last call to Reset().
int64_t null_count_;
private:
/// Returns true if we support reading statistics stored in the fields 'min_value' and
@@ -149,10 +154,13 @@ class ColumnStats : public ColumnStatsBase {
min_buffer_(mem_pool),
max_buffer_(mem_pool) {}
/// Updates the statistics based on the value 'v'. If necessary, initializes the
/// statistics. It may keep a reference to 'v' until
/// Updates the statistics based on the values min_value and max_value. If necessary,
/// initializes the statistics. It may keep a reference to either value until
/// MaterializeStringValuesToInternalBuffers() gets called.
void Update(const T& v);
void Update(const T& min_value, const T& max_value);
/// Wrapper to call the Update function which takes in the min_value and max_value.
void Update(const T& v) { Update(v, v); }
virtual void Merge(const ColumnStatsBase& other) override;
virtual void MaterializeStringValuesToInternalBuffers() override {}

View File

@@ -24,15 +24,20 @@
namespace impala {
inline void ColumnStatsBase::Reset() {
has_min_max_values_ = false;
null_count_ = 0;
}
template <typename T>
inline void ColumnStats<T>::Update(const T& v) {
if (!has_values_) {
has_values_ = true;
min_value_ = v;
max_value_ = v;
inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) {
if (!has_min_max_values_) {
has_min_max_values_ = true;
min_value_ = min_value;
max_value_ = max_value;
} else {
min_value_ = std::min(min_value_, v);
max_value_ = std::max(max_value_, v);
min_value_ = std::min(min_value_, min_value);
max_value_ = std::max(max_value_, max_value);
}
}
@@ -40,31 +45,27 @@ template <typename T>
inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
if (!cs->has_values_) return;
if (!has_values_) {
has_values_ = true;
min_value_ = cs->min_value_;
max_value_ = cs->max_value_;
} else {
min_value_ = std::min(min_value_, cs->min_value_);
max_value_ = std::max(max_value_, cs->max_value_);
}
if (cs->has_min_max_values_) Update(cs->min_value_, cs->max_value_);
IncrementNullCount(cs->null_count_);
}
template <typename T>
inline int64_t ColumnStats<T>::BytesNeeded() const {
return BytesNeeded(min_value_) + BytesNeeded(max_value_);
return BytesNeeded(min_value_) + BytesNeeded(max_value_)
+ ParquetPlainEncoder::ByteSize(null_count_);
}
template <typename T>
inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const {
DCHECK(has_values_);
std::string min_str;
EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str);
out->__set_min_value(move(min_str));
std::string max_str;
EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str);
out->__set_max_value(move(max_str));
if (has_min_max_values_) {
std::string min_str;
EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str);
out->__set_min_value(move(min_str));
std::string max_str;
EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str);
out->__set_max_value(move(max_str));
}
out->__set_null_count(null_count_);
}
template <typename T>
@@ -145,44 +146,21 @@ inline bool ColumnStats<StringValue>::DecodePlainValue(
}
template <>
inline void ColumnStats<StringValue>::Update(const StringValue& v) {
if (!has_values_) {
has_values_ = true;
min_value_ = v;
inline void ColumnStats<StringValue>::Update(
const StringValue& min_value, const StringValue& max_value) {
if (!has_min_max_values_) {
has_min_max_values_ = true;
min_value_ = min_value;
min_buffer_.Clear();
max_value_ = v;
max_value_ = max_value;
max_buffer_.Clear();
} else {
if (v < min_value_) {
min_value_ = v;
if (min_value < min_value_) {
min_value_ = min_value;
min_buffer_.Clear();
}
if (v > max_value_) {
max_value_ = v;
max_buffer_.Clear();
}
}
}
template <>
inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) {
DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other));
const ColumnStats<StringValue>* cs =
static_cast<const ColumnStats<StringValue>*>(&other);
if (!cs->has_values_) return;
if (!has_values_) {
has_values_ = true;
min_value_ = cs->min_value_;
min_buffer_.Clear();
max_value_ = cs->max_value_;
max_buffer_.Clear();
} else {
if (cs->min_value_ < min_value_) {
min_value_ = cs->min_value_;
min_buffer_.Clear();
}
if (cs->max_value_ > max_value_) {
max_value_ = cs->max_value_;
if (max_value > max_value_) {
max_value_ = max_value;
max_buffer_.Clear();
}
}

View File

@@ -67,7 +67,7 @@ class TimeStamp():
return self.timetuple == other_timetuple
ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max'])
ColumnStats = namedtuple('ColumnStats', ['name', 'min', 'max', 'null_count'])
# Test a smaller parquet file size as well
# TODO: these tests take a while so we don't want to go through too many sizes but
@@ -319,15 +319,17 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
if stats is None:
decoded.append(None)
continue
min_value = None
max_value = None
if stats.min_value is None and stats.max_value is None:
decoded.append(None)
continue
if stats.min_value is not None and stats.max_value is not None:
min_value = decode_stats_value(schema, stats.min_value)
max_value = decode_stats_value(schema, stats.max_value)
assert stats.min_value is not None and stats.max_value is not None
min_value = decode_stats_value(schema, stats.min_value)
max_value = decode_stats_value(schema, stats.max_value)
decoded.append(ColumnStats(schema.name, min_value, max_value))
null_count = stats.null_count
assert null_count is not None
decoded.append(ColumnStats(schema.name, min_value, max_value, null_count))
assert len(decoded) == len(schemas)
return decoded
@@ -367,7 +369,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
return row_group_stats
def _validate_min_max_stats(self, hdfs_path, expected_values, skip_col_idxs = None):
def _validate_parquet_stats(self, hdfs_path, expected_values, skip_col_idxs = None):
"""Validates that 'hdfs_path' contains exactly one parquet file and that the rowgroup
statistics in that file match the values in 'expected_values'. Columns indexed by
'skip_col_idx' are excluded from the verification of the expected values.
@@ -408,7 +410,7 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
qualified_table_name, source_table)
vector.get_value('exec_option')['num_nodes'] = 1
self.execute_query(query, vector.get_value('exec_option'))
self._validate_min_max_stats(hdfs_path, expected_values)
self._validate_parquet_stats(hdfs_path, expected_values)
def test_write_statistics_alltypes(self, vector, unique_database):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
@@ -416,20 +418,20 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
"""
# Expected values for functional.alltypes
expected_min_max_values = [
ColumnStats('id', 0, 7299),
ColumnStats('bool_col', False, True),
ColumnStats('tinyint_col', 0, 9),
ColumnStats('smallint_col', 0, 9),
ColumnStats('int_col', 0, 9),
ColumnStats('bigint_col', 0, 90),
ColumnStats('float_col', 0, RoundFloat(9.9, 1)),
ColumnStats('double_col', 0, RoundFloat(90.9, 1)),
ColumnStats('date_string_col', '01/01/09', '12/31/10'),
ColumnStats('string_col', '0', '9'),
ColumnStats('id', 0, 7299, 0),
ColumnStats('bool_col', False, True, 0),
ColumnStats('tinyint_col', 0, 9, 0),
ColumnStats('smallint_col', 0, 9, 0),
ColumnStats('int_col', 0, 9, 0),
ColumnStats('bigint_col', 0, 90, 0),
ColumnStats('float_col', 0, RoundFloat(9.9, 1), 0),
ColumnStats('double_col', 0, RoundFloat(90.9, 1), 0),
ColumnStats('date_string_col', '01/01/09', '12/31/10', 0),
ColumnStats('string_col', '0', '9', 0),
ColumnStats('timestamp_col', TimeStamp('2009-01-01 00:00:00.0'),
TimeStamp('2010-12-31 05:09:13.860000')),
ColumnStats('year', 2009, 2010),
ColumnStats('month', 1, 12),
TimeStamp('2010-12-31 05:09:13.860000'), 0),
ColumnStats('year', 2009, 2010, 0),
ColumnStats('month', 1, 12, 0),
]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.alltypes",
@@ -441,12 +443,12 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
"""
# Expected values for functional.decimal_tbl
expected_min_max_values = [
ColumnStats('d1', 1234, 132842),
ColumnStats('d2', 111, 2222),
ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789')),
ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789')),
ColumnStats('d5', Decimal('0.1'), Decimal('12345.789')),
ColumnStats('d6', 1, 1)
ColumnStats('d1', 1234, 132842, 0),
ColumnStats('d2', 111, 2222, 0),
ColumnStats('d3', Decimal('1.23456789'), Decimal('12345.6789'), 0),
ColumnStats('d4', Decimal('0.123456789'), Decimal('0.123456789'), 0),
ColumnStats('d5', Decimal('0.1'), Decimal('12345.789'), 0),
ColumnStats('d6', 1, 1, 0)
]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.decimal_tbl",
@@ -458,31 +460,32 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
"""
# Expected values for tpch_parquet.customer
expected_min_max_values = [
ColumnStats('c_custkey', 1, 150000),
ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000'),
ColumnStats('c_address', ' 2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ'),
ColumnStats('c_nationkey', 0, 24),
ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881'),
ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99')),
ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY'),
ColumnStats('c_custkey', 1, 150000, 0),
ColumnStats('c_name', 'Customer#000000001', 'Customer#000150000', 0),
ColumnStats('c_address', ' 2uZwVhQvwA', 'zzxGktzXTMKS1BxZlgQ9nqQ', 0),
ColumnStats('c_nationkey', 0, 24, 0),
ColumnStats('c_phone', '10-100-106-1617', '34-999-618-6881', 0),
ColumnStats('c_acctbal', Decimal('-999.99'), Decimal('9999.99'), 0),
ColumnStats('c_mktsegment', 'AUTOMOBILE', 'MACHINERY', 0),
ColumnStats('c_comment', ' Tiresias according to the slyly blithe instructions '
'detect quickly at the slyly express courts. express dinos wake ',
'zzle. blithely regular instructions cajol'),
'zzle. blithely regular instructions cajol', 0),
]
self._ctas_table_and_verify_stats(vector, unique_database, "tpch_parquet.customer",
expected_min_max_values)
def test_write_statistics_null(self, vector, unique_database):
"""Test that we don't write min/max statistics for null columns."""
"""Test that we don't write min/max statistics for null columns. Ensure null_count
is set for columns with null values."""
expected_min_max_values = [
ColumnStats('a', 'a', 'a'),
ColumnStats('b', '', ''),
None,
None,
None,
ColumnStats('f', 'a\x00b', 'a\x00b'),
ColumnStats('g', '\x00', '\x00')
ColumnStats('a', 'a', 'a', 0),
ColumnStats('b', '', '', 0),
ColumnStats('c', None, None, 1),
ColumnStats('d', None, None, 1),
ColumnStats('e', None, None, 1),
ColumnStats('f', 'a\x00b', 'a\x00b', 0),
ColumnStats('g', '\x00', '\x00', 0)
]
self._ctas_table_and_verify_stats(vector, unique_database, "functional.nulltable",
@@ -503,9 +506,9 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
(cast("xy" as char(3)), "abc banana", "dolor dis amet")""".format(qualified_table_name)
self.execute_query(insert_stmt)
expected_min_max_values = [
ColumnStats('c3', 'abc', 'xy'),
ColumnStats('vc', 'abc banana', 'ghj xyz'),
ColumnStats('st', 'abc xyz', 'lorem ipsum')
ColumnStats('c3', 'abc', 'xy', 0),
ColumnStats('vc', 'abc banana', 'ghj xyz', 0),
ColumnStats('st', 'abc xyz', 'lorem ipsum', 0)
]
self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
@@ -528,11 +531,11 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
self.execute_query(create_view_stmt)
expected_min_max_values = [
ColumnStats('id', -7299, 7298),
ColumnStats('int_col', -9, 8),
ColumnStats('bigint_col', -90, 80),
ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1)),
ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1)),
ColumnStats('id', -7299, 7298, 0),
ColumnStats('int_col', -9, 8, 0),
ColumnStats('bigint_col', -90, 80, 0),
ColumnStats('float_col', RoundFloat(-9.9, 1), RoundFloat(8.8, 1), 0),
ColumnStats('double_col', RoundFloat(-90.9, 1), RoundFloat(80.8, 1), 0),
]
self._ctas_table_and_verify_stats(vector, unique_database, qualified_view_name,
@@ -586,9 +589,26 @@ class TestHdfsParquetTableStatsWriter(ImpalaTestSuite):
self.execute_query(insert_stmt)
expected_min_max_values = [
ColumnStats('f', float('-inf'), float('inf')),
ColumnStats('d', float('-inf'), float('inf')),
ColumnStats('f', float('-inf'), float('inf'), 0),
ColumnStats('d', float('-inf'), float('inf'), 0),
]
self._ctas_table_and_verify_stats(vector, unique_database, qualified_table_name,
expected_min_max_values)
def test_write_null_count_statistics(self, vector, unique_database):
"""Test that writing a parquet file populates the rowgroup statistics with the correct
null_count. This test ensures that the null_count is correct for a table with multiple
null values."""
# Expected values for tpch_parquet.customer
expected_min_max_values = [
ColumnStats('id', '8600000US00601', '8600000US999XX', 0),
ColumnStats('zip', '00601', '999XX', 0),
ColumnStats('description1', '\"00601 5-Digit ZCTA', '\"999XX 5-Digit ZCTA', 0),
ColumnStats('description2', ' 006 3-Digit ZCTA\"', ' 999 3-Digit ZCTA\"', 0),
ColumnStats('income', 0, 189570, 29),
]
self._ctas_table_and_verify_stats(vector, unique_database,
"functional_parquet.zipcode_incomes", expected_min_max_values)