From 588e1d46e9bd88d17676c47a0bf1237d3ebb11da Mon Sep 17 00:00:00 2001 From: Tim Armstrong Date: Fri, 9 Feb 2018 12:00:54 -0800 Subject: [PATCH] IMPALA-6324: Support reading RLE-encoded boolean values in Parquet scanner Impala already supported RLE encoding for levels and dictionary pages, so the only task was to integrate it into BoolColumnReader. A new benchmark, rle-benchmark.cc is added to test the speed of RLE decoding for different bit widths and run lengths. There might be a small performance impact on PLAIN encoded booleans, because of the additional branch when the cache of BoolColumnReader is filled. As the cache size is 128, I considered this to be outside the "hot loop". Testing: As Impala cannot write RLE encoded bool columns at the moment, parquet-mr was used to create a test file, testdata/data/rle_encoded_bool.parquet tests/query_test/test_scanners.py#test_rle_encoded_bools creates a table that uses this file, and tries to query from it. Change-Id: I4644bf8cf5d2b7238b05076407fbf78ab5d2c14f Reviewed-on: http://gerrit.cloudera.org:8080/9403 Reviewed-by: Tim Armstrong Tested-by: Impala Public Jenkins --- be/src/benchmarks/CMakeLists.txt | 1 + be/src/benchmarks/rle-benchmark.cc | 194 ++++++++++++++++++ be/src/exec/parquet-column-readers.cc | 78 ++++--- be/src/exec/parquet-column-readers.h | 6 +- be/src/util/rle-encoding.h | 37 ++++ be/src/util/rle-test.cc | 33 ++- testdata/data/README | 6 + testdata/data/rle_encoded_bool.parquet | Bin 0 -> 625 bytes .../QueryTest/parquet-rle-encoded-bool.test | 17 ++ tests/query_test/test_scanners.py | 12 ++ 10 files changed, 331 insertions(+), 53 deletions(-) create mode 100644 be/src/benchmarks/rle-benchmark.cc create mode 100644 testdata/data/rle_encoded_bool.parquet create mode 100644 testdata/workloads/functional-query/queries/QueryTest/parquet-rle-encoded-bool.test diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt index 02fbaad65..6957bd1a8 100644 --- a/be/src/benchmarks/CMakeLists.txt +++ b/be/src/benchmarks/CMakeLists.txt @@ -49,6 +49,7 @@ ADD_BE_BENCHMARK(network-perf-benchmark) ADD_BE_BENCHMARK(overflow-benchmark) ADD_BE_BENCHMARK(parse-timestamp-benchmark) ADD_BE_BENCHMARK(process-wide-locks-benchmark) +ADD_BE_BENCHMARK(rle-benchmark) ADD_BE_BENCHMARK(row-batch-serialize-benchmark) ADD_BE_BENCHMARK(scheduler-benchmark) ADD_BE_BENCHMARK(status-benchmark) diff --git a/be/src/benchmarks/rle-benchmark.cc b/be/src/benchmarks/rle-benchmark.cc new file mode 100644 index 000000000..9b5877c87 --- /dev/null +++ b/be/src/benchmarks/rle-benchmark.cc @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include + +#include "gutil/strings/substitute.h" +#include "util/benchmark.h" +#include "util/rle-encoding.h" +#include "util/cpu-info.h" + +#include "common/names.h" + +// Benchmark to measure the speed of Parquet RLE decoding for various bit widths and +// run lengths. Currently compares RleBatchDecoder used by Impala with an older version +// that used memset. + +// Machine Info: Intel(R) Core(TM) i5-6600 CPU @ 3.30GHz +// RLE decoding bit_width 1: Function iters/ms 10%ile 50%ile 90%ile 10%ile 50%ile 90%ile +// (relative) (relative) (relative) +// +// for loop / run length: 1 0.821 0.821 0.822 1X 1X 1X +// memset / run length: 1 0.824 0.836 0.836 1X 1.02X 1.02X +// for loop / run length: 10 0.4 0.4 0.4 0.487X 0.487X 0.486X +// memset / run length: 10 0.396 0.4 0.4 0.482X 0.487X 0.486X +// for loop / run length: 100 1.06 1.06 1.06 1.29X 1.29X 1.29X +// memset / run length: 100 1.04 1.04 1.04 1.26X 1.26X 1.26X +// for loop / run length: 1000 5.83 5.87 5.94 7.1X 7.14X 7.23X +// memset / run length: 1000 5.87 5.87 5.87 7.14X 7.14X 7.13X +// for loop / run length: 10000 9.53 9.54 9.54 11.6X 11.6X 11.6X +// memset / run length: 10000 9.54 9.54 9.54 11.6X 11.6X 11.6X + +using std::min; +using std::uniform_int_distribution; +using std::minstd_rand; + +using namespace impala; + +constexpr int MAX_BIT_WIDTH = 8; +constexpr int NUM_OUT_VALUES = 1024 * 1024; + +uint8_t out_buffer[NUM_OUT_VALUES]; + +/// RLE encodes NUM_OUT_VALUES number of bytes into the buffer. +/// The length of runs are pseudo random between 1 and max_run_length. +int FillWithRle(vector* buffer, int bit_width, int max_run_length) { + RleEncoder encoder(buffer->data(), buffer->size(), bit_width); + + uniform_int_distribution uniform_dist(1, max_run_length); + minstd_rand rand_engine; + uint8_t val = 0; + int run_length = 0; + for (int i = 0; i < NUM_OUT_VALUES; ++i) { + if (!encoder.Put(val)) { + LOG(ERROR) << Substitute( + "Error during RLE encoding. bit_widths: $0 max_run_length: $1", + bit_width, max_run_length); + } + if (run_length == 0) { + run_length = uniform_dist(rand_engine); + val = (val + 1) % (1 << bit_width); + } + --run_length; + } + return encoder.Flush(); +} + +struct BenchmarkParams { + int bit_width; + int max_run_length; + vector input_buffer; + int input_size; + + BenchmarkParams(int bit_width, int max_run_length) + : bit_width(bit_width), + max_run_length(max_run_length), + // Add some extra space for the possible overhead of RLE. + input_buffer(3 * NUM_OUT_VALUES * bit_width / 8) { + input_size = FillWithRle(&input_buffer, bit_width, max_run_length); + } +}; + + +/// Copy of the old version of RleBatchDecoder::GetValues() modified to get the +/// RleBatchDecoder as argument. +template +inline int32_t GetValuesMemset(int32_t num_values_to_consume, T* values, + RleBatchDecoder* decoder) { + int32_t num_consumed = 0; + while (num_consumed < num_values_to_consume) { + // Add RLE encoded values by repeating the current value this number of times. + uint32_t num_repeats = decoder->NextNumRepeats(); + if (num_repeats > 0) { + uint32_t num_repeats_to_set = + min(num_repeats, num_values_to_consume - num_consumed); + T repeated_value = decoder->GetRepeatedValue(num_repeats_to_set); + memset(values + num_consumed, repeated_value, num_repeats_to_set); + num_consumed += num_repeats_to_set; + continue; + } + + // Add remaining literal values, if any. + uint32_t num_literals = decoder->NextNumLiterals(); + if (num_literals == 0) break; + uint32_t num_literals_to_set = + min(num_literals, num_values_to_consume - num_consumed); + if (!decoder->GetLiteralValues(num_literals_to_set, values + num_consumed)) { + DCHECK(false); + return 0; + } + num_consumed += num_literals_to_set; + } + return num_consumed; +} + +/// Benchmark calling RleBatchDecoder::GetValues(). +void RleBenchmark(int batch_size, void* data) { + for (int i = 0; i < batch_size; ++i) { + BenchmarkParams* p = reinterpret_cast(data); + RleBatchDecoder decoder(p->input_buffer.data(), p->input_size, p->bit_width); + int result = decoder.GetValues(NUM_OUT_VALUES, out_buffer); + if (result != NUM_OUT_VALUES) { + LOG(ERROR) << Substitute( + "Error in GetValues(). bit_width: $0 max_run_length: $1 " + "expected number of values: $2 decoded number of values: $3", + p->bit_width, p->max_run_length, NUM_OUT_VALUES, result); + exit(1); + } + } +} + +/// Benchmark calling the old version of RleBatchDecoder::GetValues() that used +/// memset() for setting repeated values. +void RleBenchmarkMemset(int batch_size, void* data) { + for (int i = 0; i < batch_size; ++i) { + BenchmarkParams* p = reinterpret_cast(data); + RleBatchDecoder decoder(p->input_buffer.data(), p->input_size, p->bit_width); + int result = GetValuesMemset(NUM_OUT_VALUES, out_buffer, &decoder); + if (result != NUM_OUT_VALUES) { + LOG(ERROR) << Substitute( + "Error in GetValuesMemset(). bit_width: $0 max_run_length: $1 " + "expected number of values: $2 decoded number of values: $3", + p->bit_width, p->max_run_length, NUM_OUT_VALUES, result); + exit(1); + } + } +} + +struct RleBenchmarks { + BenchmarkParams params; + + RleBenchmarks(Benchmark* suite, int bit_width, int run_length) + : params(bit_width, run_length) { + suite->AddBenchmark( + Substitute("for loop / max run length: $0", run_length), + RleBenchmark, ¶ms); + suite->AddBenchmark( + Substitute("memset / max run length: $0", run_length), + RleBenchmarkMemset, ¶ms); + } +}; + +int main(int argc, char **argv) { + CpuInfo::Init(); + cout << endl << Benchmark::GetMachineInfo() << endl; + + for (int bit_width = 1; bit_width <= MAX_BIT_WIDTH; ++bit_width) { + Benchmark suite(Substitute("RLE decoding bit_width $0", bit_width)); + + RleBenchmarks b1(&suite, bit_width, 1); + RleBenchmarks b10(&suite, bit_width, 10); + RleBenchmarks b100(&suite, bit_width, 100); + RleBenchmarks b1000(&suite, bit_width, 1000); + RleBenchmarks b10000(&suite, bit_width, 10000); + + cout << suite.Measure() << endl; + } + return 0; +} diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 7647c73b0..5c74e5bc1 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -181,34 +181,8 @@ bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) { return true; } DCHECK_EQ(encoding_, parquet::Encoding::RLE); - return FillCacheRle(batch_size, num_cached_levels); -} - -bool ParquetLevelDecoder::FillCacheRle(int batch_size, int* num_cached_levels) { - int num_values = 0; - while (num_values < batch_size) { - // Add RLE encoded values by repeating the current value this number of times. - uint32_t num_repeats = rle_decoder_.NextNumRepeats(); - if (num_repeats > 0) { - uint32_t num_repeats_to_set = min(num_repeats, batch_size - num_values); - uint8_t repeated_value = rle_decoder_.GetRepeatedValue(num_repeats_to_set); - memset(cached_levels_ + num_values, repeated_value, num_repeats_to_set); - num_values += num_repeats_to_set; - continue; - } - - // Add remaining literal values, if any. - uint32_t num_literals = rle_decoder_.NextNumLiterals(); - if (num_literals == 0) break; - uint32_t num_literals_to_set = min(num_literals, batch_size - num_values); - if (!rle_decoder_.GetLiteralValues( - num_literals_to_set, &cached_levels_[num_values])) { - return false; - } - num_values += num_literals_to_set; - } - *num_cached_levels = num_values; - return true; + *num_cached_levels = rle_decoder_.GetValues(batch_size, cached_levels_); + return *num_cached_levels > 0; } /// Per column type reader. InternalType is the datatype that Impala uses internally to @@ -481,11 +455,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { page_encoding_ = current_page_header_.data_page_header.encoding; if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY && page_encoding_ != parquet::Encoding::PLAIN) { - stringstream ss; - ss << "File '" << filename() << "' is corrupt: unexpected encoding: " - << PrintEncoding(page_encoding_) << " for data page of column '" - << schema_element().name << "'."; - return Status(ss.str()); + return GetUnsupportedDecodingError(); } // If slot_desc_ is NULL, dict_decoder_ is uninitialized @@ -690,8 +660,21 @@ class BoolColumnReader : public BaseScalarColumnReader { virtual void ClearDictionaryDecoder() { } virtual Status InitDataPage(uint8_t* data, int size) { - // Initialize bool decoder - bool_values_.Reset(data, size); + page_encoding_ = current_page_header_.data_page_header.encoding; + // Only the relevant decoder is initialized for a given data page. + switch (page_encoding_) { + case parquet::Encoding::PLAIN: + bool_values_.Reset(data, size); + break; + case parquet::Encoding::RLE: + // The first 4 bytes contain the size of the encoded data. This information is + // redundant, as this is the last part of the data page, and the number of + // remaining bytes is already known. + rle_decoder_.Reset(data + 4, size - 4, 1); + break; + default: + return GetUnsupportedDecodingError(); + } num_unpacked_values_ = 0; unpacked_value_idx_ = 0; return Status::OK(); @@ -726,19 +709,24 @@ class BoolColumnReader : public BaseScalarColumnReader { inline bool ReadSlot(Tuple* tuple, MemPool* pool) { void* slot = tuple->GetSlot(tuple_offset_); bool val; - if (unpacked_value_idx_ < num_unpacked_values_) { + if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) { val = unpacked_values_[unpacked_value_idx_++]; } else { // Unpack as many values as we can into the buffer. We expect to read at least one // value. - int num_unpacked = - bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); - if (UNLIKELY(num_unpacked == 0)) { + if (page_encoding_ == parquet::Encoding::PLAIN) { + num_unpacked_values_ = + bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + } else { + num_unpacked_values_ = + rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + } + + if (UNLIKELY(num_unpacked_values_ == 0)) { parent_->parse_status_ = Status("Invalid bool column."); return false; } val = unpacked_values_[0]; - num_unpacked_values_ = num_unpacked; unpacked_value_idx_ = 1; } *reinterpret_cast(slot) = val; @@ -756,7 +744,11 @@ class BoolColumnReader : public BaseScalarColumnReader { /// The next value to return from 'unpacked_values_'. int unpacked_value_idx_ = 0; + /// Bit packed decoder, used if 'encoding_' is PLAIN. BatchedBitReader bool_values_; + + /// RLE decoder, used if 'encoding_' is RLE. + RleBatchDecoder rle_decoder_; }; // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling @@ -1216,6 +1208,12 @@ bool BaseScalarColumnReader::NextLevels() { return parent_->parse_status_.ok(); } +Status BaseScalarColumnReader::GetUnsupportedDecodingError() { + return Status(Substitute( + "File '$0' is corrupt: unexpected encoding: $1 for data page of column '$2'.", + filename(), PrintEncoding(page_encoding_), schema_element().name)); +} + bool BaseScalarColumnReader::NextPage() { parent_->assemble_rows_timer_.Stop(); parent_->parse_status_ = ReadDataPage(); diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 8724f43e5..0ff5ce76a 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -93,9 +93,6 @@ class ParquetLevelDecoder { /// CacheHasNext() is false. bool FillCache(int batch_size, int* num_cached_levels); - /// Implementation of FillCache() for RLE encoding. - bool FillCacheRle(int batch_size, int* num_cached_levels); - /// RLE decoder, used if 'encoding_' is RLE. RleBatchDecoder rle_decoder_; @@ -509,6 +506,9 @@ class BaseScalarColumnReader : public ParquetColumnReader { /// ParquetLevelDecoder::ReadLevel() and 'max_level' is the maximum allowed value. void __attribute__((noinline)) SetLevelDecodeError(const char* level_name, int decoded_level, int max_level); + + // Returns a detailed error message about unsupported encoding. + Status GetUnsupportedDecodingError(); }; /// Collections are not materialized directly in parquet files; only scalar values appear diff --git a/be/src/util/rle-encoding.h b/be/src/util/rle-encoding.h index 861bf8e64..83b631506 100644 --- a/be/src/util/rle-encoding.h +++ b/be/src/util/rle-encoding.h @@ -134,6 +134,10 @@ class RleBatchDecoder { /// decoding the values. bool GetSingleValue(T* val) WARN_UNUSED_RESULT; + /// Consume 'num_values_to_consume' values and copy them to 'values'. + /// Returns the number of consumed values or 0 if an error occurred. + int32_t GetValues(int32_t num_values_to_consume, T* values); + private: BatchedBitReader bit_reader_; @@ -654,6 +658,39 @@ inline int32_t RleBatchDecoder::DecodeBufferedLiterals( return num_to_output; } +template +inline int32_t RleBatchDecoder::GetValues(int32_t num_values_to_consume, T* values) { + DCHECK_GT(num_values_to_consume, 0); + DCHECK(values != nullptr); + + int32_t num_consumed = 0; + while (num_consumed < num_values_to_consume) { + // Add RLE encoded values by repeating the current value this number of times. + uint32_t num_repeats = NextNumRepeats(); + if (num_repeats > 0) { + uint32_t num_repeats_to_set = + std::min(num_repeats, num_values_to_consume - num_consumed); + T repeated_value = GetRepeatedValue(num_repeats_to_set); + for (int i = 0; i < num_repeats_to_set; ++i) { + values[num_consumed + i] = repeated_value; + } + num_consumed += num_repeats_to_set; + continue; + } + + // Add remaining literal values, if any. + uint32_t num_literals = NextNumLiterals(); + if (num_literals == 0) break; + uint32_t num_literals_to_set = + std::min(num_literals, num_values_to_consume - num_consumed); + if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) { + return 0; + } + num_consumed += num_literals_to_set; + } + return num_consumed; +} + template constexpr int RleBatchDecoder::LITERAL_BUFFER_LEN; } diff --git a/be/src/util/rle-test.cc b/be/src/util/rle-test.cc index b8a1d94b1..217d43e4f 100644 --- a/be/src/util/rle-test.cc +++ b/be/src/util/rle-test.cc @@ -145,7 +145,7 @@ TEST(BitArray, TestValues) { } } -/// Get many values from a batch RLE decoder. +/// Get many values from a batch RLE decoder using its low level functions. template static bool GetRleValues(RleBatchDecoder* decoder, int num_vals, T* vals) { int decoded = 0; @@ -173,6 +173,12 @@ static bool GetRleValues(RleBatchDecoder* decoder, int num_vals, T* vals) { return true; } +/// Get many values from a batch RLE decoder using its GetValues() function. +template +static bool GetRleValuesBatched(RleBatchDecoder* decoder, int num_vals, T* vals) { + return num_vals == decoder->GetValues(num_vals, vals); +} + // Validates encoding of values by encoding and decoding them. If // expected_encoding != NULL, also validates that the encoded buffer is // exactly 'expected_encoding'. @@ -198,23 +204,30 @@ void ValidateRle(const vector& values, int bit_width, } // Verify read - RleBatchDecoder decoder(buffer, len, bit_width); - RleBatchDecoder decoder2(buffer, len, bit_width); + RleBatchDecoder per_value_decoder(buffer, len, bit_width); + RleBatchDecoder per_run_decoder(buffer, len, bit_width); + RleBatchDecoder batch_decoder(buffer, len, bit_width); // Ensure it returns the same results after Reset(). for (int trial = 0; trial < 2; ++trial) { for (int i = 0; i < values.size(); ++i) { uint64_t val; - EXPECT_TRUE(decoder.GetSingleValue(&val)); + EXPECT_TRUE(per_value_decoder.GetSingleValue(&val)); EXPECT_EQ(values[i], val) << i; } - // Unpack everything at once from the second batch decoder. - vector decoded_values(values.size()); - EXPECT_TRUE(GetRleValues(&decoder2, values.size(), decoded_values.data())); + // Unpack everything at once from the other decoders. + vector decoded_values1(values.size()); + vector decoded_values2(values.size()); + EXPECT_TRUE(GetRleValues( + &per_run_decoder, decoded_values1.size(), decoded_values1.data())); + EXPECT_TRUE(GetRleValuesBatched( + &batch_decoder, decoded_values2.size(), decoded_values2.data())); for (int i = 0; i < values.size(); ++i) { - EXPECT_EQ(values[i], decoded_values[i]) << i; + EXPECT_EQ(values[i], decoded_values1[i]) << i; + EXPECT_EQ(values[i], decoded_values2[i]) << i; } - decoder.Reset(buffer, len, bit_width); - decoder2.Reset(buffer, len, bit_width); + per_value_decoder.Reset(buffer, len, bit_width); + per_run_decoder.Reset(buffer, len, bit_width); + batch_decoder.Reset(buffer, len, bit_width); } } diff --git a/testdata/data/README b/testdata/data/README index 928d435a7..ac260ef1d 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -168,3 +168,9 @@ A file with a single boolean column with page metadata reporting 2 values but on levels for a single literal value. Generated by hacking Impala's parquet writer to increment page.header.data_page_header.num_values. This caused Impala to hit a DCHECK (IMPALA-6589). + +rle_encoded_bool.parquet: +Parquet v1 file with RLE encoded boolean column "b" and int column "i". +Created for IMPALA-6324, generated with modified parquet-mr. Contains 279 rows, +139 with value false, and 140 with value true. "i" is always 1 if "b" is True +and always 0 if "b" is false. diff --git a/testdata/data/rle_encoded_bool.parquet b/testdata/data/rle_encoded_bool.parquet new file mode 100644 index 0000000000000000000000000000000000000000..3e24e64b6a3c199a5086f4e78f705ccec0479c1b GIT binary patch literal 625 zcmZ`%F>BjE6uxt>MuZM2;lv#nA}++Vpa=^)X=J>3s)x{+Btsx1Sh_n=fh;Am>{5cK zb_oetx^(K)*;`A;(4~d`gF^cY`Xll#DTXxk-SBwtd*AoHht%)x_!QAP-E!%S&?-Eu z#xQ0W*$86!+%gG5#)HBBx3Ayv#|0KQd`~Kbe!rjnSOSa^x=34FkjNd%0D?s5(U%Xu zj0_k#w0C1@@eJyqEV^vPVZrq=V=Lc6s=Ymj-YFD zg6tXLHSX4}8by4yYH&330D7u!2ofWJ$_Xrr7pZ-u5{{U;ZQRYPKq3{rQ@(T8SBzBr8kl<^)O`b z@Tn+YA8XMTg_e1tgewdQ>YMwZj1!%zk!T;B&JpO|k`6>RPE#n&unseE=nAFdaGaK+ zZJ?<;XC?PpNmkxx98MA2`Mc}duy7{27$te;_+H@oYr8MH{oUU7p7SIcC#iB;DhM`| nABED7B7bvpL#jZwo;9Pe(a`Hn-IRVH