mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-3841: Enable late materialization for collections
This patch enables late materialization for collections to avoid the cost of materializing collections that will never be accessed by the query. For a collection column, late materialization takes effect only when the collection column is not used in any predicate, including the `!empty()` predicate added by the planner. Otherwise we need to read every row to evaluate the predicate and cannot skip any. Therefore, this patch skips registering the `!empty()` predicates if the query contains zipping unnests. This can affect performance if the table contains many empty collections, but should be noticeable only in very extreme cases. The late materialization threshold is set to 1 in HdfsParquetScanner when there is any collection that can be skipped. This patch also adds the detail of `HdfsScanner::parse_status_` to the error message returned by the HdfsParquetScanner to help figure out the root cause. Performance: - Tests with the queries involving collection columns in table `tpch_nested_parquet.customer` show that when the selectivity is low, the single-threaded (1 impalad and MT_DOP=1) scanning time can be reduced by about 50%, while when the selectivity is high, the scanning time almost does not change. - For queries not involving collections, performance A/B testing shows no regression on TPC-H. Testing: - Added a runtime profile counter NumTopLevelValuesSkipped to record the total number of top-level values skipped for all columns. The counter only counts the values that are not skipped as a page. - Added e2e test cases in test_parquet_late_materialization.py to ensure that late materialization works using the new counter. Change-Id: Ia21bdfa6811408d66d74367e0a9520e20951105f Reviewed-on: http://gerrit.cloudera.org:8080/22662 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
4d9612f514
commit
607bad042a
@@ -155,6 +155,8 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
|
||||
num_pages_skipped_by_late_materialization_counter_ =
|
||||
ADD_COUNTER(scan_node_->runtime_profile(), "NumPagesSkippedByLateMaterialization",
|
||||
TUnit::UNIT);
|
||||
num_top_level_values_skipped_counter_ =
|
||||
ADD_COUNTER(scan_node_->runtime_profile(), "NumTopLevelValuesSkipped", TUnit::UNIT);
|
||||
num_dict_filtered_row_groups_counter_ =
|
||||
ADD_COUNTER(scan_node_->runtime_profile(), "NumDictFilteredRowGroups", TUnit::UNIT);
|
||||
parquet_compressed_page_size_counter_ = ADD_SUMMARY_STATS_COUNTER(
|
||||
@@ -257,15 +259,18 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
|
||||
}
|
||||
DivideFilterAndNonFilterColumnReaders(column_readers_, &filter_readers_,
|
||||
&non_filter_readers_);
|
||||
// Set the late materialization threshold to 1 if
|
||||
// - late materialization is enabled, and
|
||||
// - there is any collection that can be skipped.
|
||||
if (late_materialization_threshold_ >= 0
|
||||
&& std::find_if(non_filter_readers_.begin(), non_filter_readers_.end(),
|
||||
[](ParquetColumnReader* reader) { return reader->IsCollectionReader(); })
|
||||
!= non_filter_readers_.end()) {
|
||||
late_materialization_threshold_ = 1;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
// Currently, Collection Readers and scalar readers upon collection values
|
||||
// are not supported for late materialization.
|
||||
static bool DoesNotSupportLateMaterialization(ParquetColumnReader* column_reader) {
|
||||
return column_reader->IsCollectionReader() || column_reader->max_rep_level() > 0;
|
||||
}
|
||||
|
||||
void HdfsParquetScanner::DivideFilterAndNonFilterColumnReaders(
|
||||
const vector<ParquetColumnReader*>& column_readers,
|
||||
vector<ParquetColumnReader*>* filter_readers,
|
||||
@@ -274,9 +279,10 @@ void HdfsParquetScanner::DivideFilterAndNonFilterColumnReaders(
|
||||
non_filter_readers->clear();
|
||||
for (auto column_reader : column_readers) {
|
||||
auto slot_desc = column_reader->slot_desc();
|
||||
if (DoesNotSupportLateMaterialization(column_reader) || (slot_desc != nullptr &&
|
||||
std::find(conjunct_slot_ids_.begin(), conjunct_slot_ids_.end(), slot_desc->id())
|
||||
!= conjunct_slot_ids_.end())) {
|
||||
if (slot_desc != nullptr
|
||||
&& std::find(
|
||||
conjunct_slot_ids_.begin(), conjunct_slot_ids_.end(), slot_desc->id())
|
||||
!= conjunct_slot_ids_.end()) {
|
||||
filter_readers->push_back(column_reader);
|
||||
} else {
|
||||
non_filter_readers->push_back(column_reader);
|
||||
@@ -2513,10 +2519,9 @@ Status HdfsParquetScanner::SkipRowsForColumns(
|
||||
// among columns.
|
||||
if (UNLIKELY(!col_reader->SkipRows(*num_rows_to_skip, *skip_to_row))) {
|
||||
return Status(Substitute(
|
||||
"Parquet file might be corrupted: Error in skipping $0 values to row $1 "
|
||||
"in column $2 of file $3.",
|
||||
"Error in skipping $0 values to row $1 in column $2 of file $3. Detail: $4",
|
||||
*num_rows_to_skip, *skip_to_row, col_reader->schema_element().name,
|
||||
filename()));
|
||||
filename(), parse_status_.GetDetail()));
|
||||
}
|
||||
}
|
||||
*num_rows_to_skip = 0;
|
||||
@@ -2544,13 +2549,14 @@ Status HdfsParquetScanner::FillScratchMicroBatches(
|
||||
if (micro_batches[0].start > 0) {
|
||||
if (UNLIKELY(!col_reader->SkipRows(micro_batches[0].start, -1))) {
|
||||
return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
|
||||
col_reader->schema_element().name, filename()));
|
||||
col_reader->schema_element().name, filename(),
|
||||
parse_status_.GetDetail()));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (UNLIKELY(!col_reader->SkipRows(micro_batches[r].start - last - 1, -1))) {
|
||||
return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
|
||||
col_reader->schema_element().name, filename()));
|
||||
col_reader->schema_element().name, filename(), parse_status_.GetDetail()));
|
||||
}
|
||||
}
|
||||
// Ensure that the length of the micro_batch is less than
|
||||
@@ -2587,7 +2593,7 @@ Status HdfsParquetScanner::FillScratchMicroBatches(
|
||||
if (UNLIKELY(last < max_num_tuples - 1)) {
|
||||
if (UNLIKELY(!col_reader->SkipRows(max_num_tuples - 1 - last, -1))) {
|
||||
return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
|
||||
col_reader->schema_element().name, filename()));
|
||||
col_reader->schema_element().name, filename(), parse_status_.GetDetail()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -539,6 +539,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
|
||||
/// rows that survived filtering.
|
||||
RuntimeProfile::Counter* num_pages_skipped_by_late_materialization_counter_;
|
||||
|
||||
/// Sum of the numbers of top-level values skipped for all columns.
|
||||
/// It only counts the values that are not skipped as a page.
|
||||
RuntimeProfile::Counter* num_top_level_values_skipped_counter_;
|
||||
|
||||
/// Number of row groups skipped due to dictionary filter. This is an aggregated counter
|
||||
/// that includes the number of filtered row groups as a result of evaluating conjuncts
|
||||
/// and runtime bloom filters on the dictionary entries.
|
||||
|
||||
@@ -25,11 +25,16 @@ bool CollectionColumnReader::NextLevels() {
|
||||
DCHECK(!children_.empty());
|
||||
DCHECK_LE(rep_level_, new_collection_rep_level());
|
||||
for (int c = 0; c < children_.size(); ++c) {
|
||||
if (children_[c]->IsComplexReader()
|
||||
&& static_cast<ComplexColumnReader*>(children_[c])->next_levels_consumed()) {
|
||||
continue;
|
||||
}
|
||||
do {
|
||||
// TODO: verify somewhere that all column readers are at end
|
||||
if (!children_[c]->NextLevels()) return false;
|
||||
} while (children_[c]->rep_level() > new_collection_rep_level());
|
||||
}
|
||||
next_levels_consumed_ = true;
|
||||
UpdateDerivedState();
|
||||
return true;
|
||||
}
|
||||
@@ -41,10 +46,12 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
|
||||
<< "Caller should have called NextLevels() until we are ready to read a value";
|
||||
|
||||
if (tuple_offset_ == -1) {
|
||||
SetDescendantsNextLevelsConsumed(false);
|
||||
return CollectionColumnReader::NextLevels();
|
||||
} else if (def_level_ >= max_def_level()) {
|
||||
return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
|
||||
} else {
|
||||
SetDescendantsNextLevelsConsumed(false);
|
||||
// Collections add an extra def level, so it is possible to distinguish between
|
||||
// NULL and empty collections. See hdfs-parquet-scanner.h for more detailed
|
||||
// explanation.
|
||||
@@ -74,6 +81,7 @@ bool CollectionColumnReader::ReadValueBatch(MemPool* pool, int max_values,
|
||||
while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
|
||||
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size);
|
||||
if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
|
||||
SetDescendantsNextLevelsConsumed(false);
|
||||
// A containing repeated field is empty or NULL
|
||||
continue_execution = NextLevels();
|
||||
continue;
|
||||
@@ -164,10 +172,12 @@ void CollectionColumnReader::UpdateDerivedState() {
|
||||
|
||||
bool CollectionColumnReader::SkipRows(int64_t num_rows, int64_t skip_row_id) {
|
||||
DCHECK(!children_.empty());
|
||||
// Prevent NextLevels() from being called more than once in the recursion when
|
||||
// e.g., a child is also a CollectionColumnReader.
|
||||
next_levels_consumed_ = false;
|
||||
for (int c = 0; c < children_.size(); ++c) {
|
||||
if (!children_[c]->SkipRows(num_rows, skip_row_id)) return false;
|
||||
}
|
||||
UpdateDerivedState();
|
||||
return true;
|
||||
return CollectionColumnReader::NextLevels();
|
||||
}
|
||||
} // namespace impala
|
||||
|
||||
@@ -1077,6 +1077,12 @@ void BaseScalarColumnReader::Close(RowBatch* row_batch) {
|
||||
col_chunk_reader_.Close(row_batch == nullptr ? nullptr : row_batch->tuple_data_pool());
|
||||
DictDecoderBase* dict_decoder = GetDictionaryDecoder();
|
||||
if (dict_decoder != nullptr) dict_decoder->Close();
|
||||
if (num_top_level_values_skipped_counter_ > 0) {
|
||||
// This can happen when the reader gets closed before finishing the current page.
|
||||
COUNTER_ADD(parent_->num_top_level_values_skipped_counter_,
|
||||
num_top_level_values_skipped_counter_);
|
||||
num_top_level_values_skipped_counter_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
Status BaseScalarColumnReader::InitDictionary() {
|
||||
@@ -1233,7 +1239,6 @@ template <bool ADVANCE_REP_LEVEL>
|
||||
bool BaseScalarColumnReader::NextLevels() {
|
||||
if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
|
||||
|
||||
levels_readahead_ = true;
|
||||
if (UNLIKELY(num_buffered_values_ == 0)) {
|
||||
if (!NextPage()) return parent_->parse_status_.ok();
|
||||
}
|
||||
@@ -1279,9 +1284,12 @@ bool BaseScalarColumnReader::NextLevels() {
|
||||
++current_row_;
|
||||
}
|
||||
|
||||
levels_readahead_ = true;
|
||||
return parent_->parse_status_.ok();
|
||||
}
|
||||
|
||||
template bool BaseScalarColumnReader::NextLevels<true>();
|
||||
|
||||
void BaseScalarColumnReader::ResetPageFiltering() {
|
||||
offset_index_.page_locations.clear();
|
||||
candidate_data_pages_.clear();
|
||||
@@ -1307,7 +1315,7 @@ Status BaseScalarColumnReader::StartPageFiltering() {
|
||||
int64_t remaining = 0;
|
||||
if (!SkipTopLevelRows(skip_rows, &remaining)) {
|
||||
return Status(ErrorMsg(TErrorCode::PARQUET_ROWS_SKIPPING,
|
||||
schema_element().name, filename()));
|
||||
schema_element().name, filename(), parent_->parse_status_.GetDetail()));
|
||||
}
|
||||
DCHECK_EQ(remaining, 0);
|
||||
DCHECK_EQ(current_row_, range_start - 1);
|
||||
@@ -1334,8 +1342,11 @@ bool BaseScalarColumnReader::SkipTopLevelRows(int64_t num_rows, int64_t* remaini
|
||||
current_row_ += rows_skipped;
|
||||
num_buffered_values_ -= rows_skipped;
|
||||
*remaining = num_rows - rows_skipped;
|
||||
// Increase the counter before returning when we successfully skip the rows.
|
||||
num_top_level_values_skipped_counter_ += rows_skipped;
|
||||
return SkipEncodedValuesInPage(rows_skipped);
|
||||
}
|
||||
int64_t num_rows_to_skip = num_rows;
|
||||
int64_t num_values_to_skip = 0;
|
||||
if (max_rep_level() == 0) {
|
||||
// No nesting, but field is not required.
|
||||
@@ -1391,6 +1402,9 @@ bool BaseScalarColumnReader::SkipTopLevelRows(int64_t num_rows, int64_t* remaini
|
||||
}
|
||||
*remaining = num_rows;
|
||||
}
|
||||
DCHECK_LT(*remaining, num_rows_to_skip);
|
||||
// Increase the counter before returning when we successfully skip the rows.
|
||||
num_top_level_values_skipped_counter_ += (num_rows_to_skip - *remaining);
|
||||
return SkipEncodedValuesInPage(num_values_to_skip);
|
||||
}
|
||||
|
||||
@@ -1520,6 +1534,11 @@ Status BaseScalarColumnReader::HandleTooEarlyEos() {
|
||||
|
||||
bool BaseScalarColumnReader::NextPage() {
|
||||
parent_->assemble_rows_timer_.Stop();
|
||||
if (num_top_level_values_skipped_counter_ > 0) {
|
||||
COUNTER_ADD(parent_->num_top_level_values_skipped_counter_,
|
||||
num_top_level_values_skipped_counter_);
|
||||
num_top_level_values_skipped_counter_ = 0;
|
||||
}
|
||||
parent_->parse_status_ = ReadDataPage();
|
||||
if (UNLIKELY(!parent_->parse_status_.ok())) return false;
|
||||
if (num_buffered_values_ == 0) {
|
||||
@@ -1644,6 +1663,9 @@ bool BaseScalarColumnReader::SkipRowsInternal(int64_t num_rows, int64_t skip_row
|
||||
// Keep advancing to next page header if rows to be skipped are more than number
|
||||
// of values in the page. Note we will just be reading headers and skipping
|
||||
// pages without decompressing them as we advance.
|
||||
// In this case, the current column is not in any collection. Therefore the number
|
||||
// of rows in the page is equal to the number of values.
|
||||
DCHECK_EQ(max_rep_level(), 0);
|
||||
while (num_rows > num_buffered_values_) {
|
||||
COUNTER_ADD(parent_->num_pages_skipped_by_late_materialization_counter_, 1);
|
||||
num_rows -= num_buffered_values_;
|
||||
|
||||
@@ -189,7 +189,7 @@ class ParquetColumnReader {
|
||||
/// Skips the number of encoded values specified by 'num_rows', without materializing or
|
||||
/// decoding them across pages. If page filtering is enabled, then it directly skips to
|
||||
/// row after 'skip_row_id' and ignores 'num_rows'.
|
||||
/// It invokes 'SkipToLevelRows' for all 'children_'.
|
||||
///
|
||||
/// Returns true on success, false otherwise.
|
||||
virtual bool SkipRows(int64_t num_rows, int64_t skip_row_id) = 0;
|
||||
|
||||
@@ -249,6 +249,13 @@ class ParquetColumnReader {
|
||||
/// int16_t is large enough to hold the valid levels 0-255 and negative sentinel values
|
||||
/// ParquetLevel::INVALID_LEVEL and ParquetLevel::ROW_GROUP_END. The maximum values are
|
||||
/// cached here because they are accessed in inner loops.
|
||||
///
|
||||
/// See ParquetSchemaResolver::CreateSchemaTree() for how max_def_level_ and
|
||||
/// max_rep_level_ are computed.
|
||||
///
|
||||
/// Some usages:
|
||||
/// - def_level_ >= max_def_level() means the current value is defined, i.e. not NULL.
|
||||
/// - rep_level_ == 0 means the current value is at the beginning of a top-level row.
|
||||
int16_t rep_level_;
|
||||
const int16_t max_rep_level_;
|
||||
int16_t def_level_;
|
||||
@@ -591,6 +598,17 @@ class BaseScalarColumnReader : public ParquetColumnReader {
|
||||
/// is more than the rows left in current row group. It can happen even with corrupt
|
||||
/// parquet file where number of values might differ from metadata.
|
||||
virtual bool SkipRows(int64_t num_rows, int64_t skip_row_id) override {
|
||||
// Undo NextLevels() if it is called before calling this method.
|
||||
if (levels_readahead_) {
|
||||
DCHECK_NE(def_level_, ParquetLevel::INVALID_LEVEL);
|
||||
DCHECK_NE(rep_level_, ParquetLevel::INVALID_LEVEL);
|
||||
rep_levels_.CachePrev();
|
||||
def_levels_.CachePrev();
|
||||
rep_level_ = ParquetLevel::INVALID_LEVEL;
|
||||
def_level_ = ParquetLevel::INVALID_LEVEL;
|
||||
++num_buffered_values_;
|
||||
levels_readahead_ = false;
|
||||
}
|
||||
if (max_rep_level() > 0) {
|
||||
return SkipRowsInternal<true>(num_rows, skip_row_id);
|
||||
} else {
|
||||
@@ -693,6 +711,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
|
||||
|
||||
Status LogCorruptNumValuesInMetadataError();
|
||||
Status HandleTooEarlyEos();
|
||||
|
||||
/// Non-thread-safe version of
|
||||
/// HdfsParquetScanner::num_top_level_values_skipped_counter_.
|
||||
int64_t num_top_level_values_skipped_counter_ = 0;
|
||||
};
|
||||
|
||||
// Inline to allow inlining into collection and scalar column reader.
|
||||
|
||||
@@ -56,6 +56,8 @@ public:
|
||||
pos_current_value_ = ParquetLevel::INVALID_POS;
|
||||
}
|
||||
|
||||
bool next_levels_consumed() const { return next_levels_consumed_; }
|
||||
|
||||
protected:
|
||||
ComplexColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
|
||||
const SlotDescriptor* slot_desc)
|
||||
@@ -68,5 +70,18 @@ protected:
|
||||
/// complex item tuples, or there is a single child reader that does not materialize
|
||||
/// any slot and is only used by this reader to read def and rep levels.
|
||||
std::vector<ParquetColumnReader*> children_;
|
||||
|
||||
/// True if the next definition level and repetition level have been consumed by
|
||||
/// NextLevels(). If true, NextLevels() should not be called on this reader again.
|
||||
bool next_levels_consumed_ = false;
|
||||
|
||||
void SetDescendantsNextLevelsConsumed(bool value) {
|
||||
next_levels_consumed_ = value;
|
||||
for (auto child : children_) {
|
||||
if (child->IsComplexReader()) {
|
||||
static_cast<ComplexColumnReader*>(child)->SetDescendantsNextLevelsConsumed(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
} // namespace impala
|
||||
|
||||
@@ -115,6 +115,10 @@ class ParquetLevelDecoder {
|
||||
int CacheSize() const { return num_cached_levels_; }
|
||||
int CacheRemaining() const { return num_cached_levels_ - cached_level_idx_; }
|
||||
int CacheCurrIdx() const { return cached_level_idx_; }
|
||||
void CachePrev() {
|
||||
DCHECK_GE(cached_level_idx_, 1);
|
||||
--cached_level_idx_;
|
||||
}
|
||||
|
||||
private:
|
||||
/// Initializes members associated with the level cache. Allocates memory for
|
||||
|
||||
@@ -23,8 +23,13 @@ bool StructColumnReader::NextLevels() {
|
||||
DCHECK(!children_.empty());
|
||||
bool result = true;
|
||||
for (ParquetColumnReader* child_reader : children_) {
|
||||
if (child_reader->IsComplexReader()
|
||||
&& static_cast<ComplexColumnReader*>(child_reader)->next_levels_consumed()) {
|
||||
continue;
|
||||
}
|
||||
result &= child_reader->NextLevels();
|
||||
}
|
||||
next_levels_consumed_ = true;
|
||||
def_level_ = children_[0]->def_level();
|
||||
rep_level_ = children_[0]->rep_level();
|
||||
if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
|
||||
@@ -46,6 +51,7 @@ bool StructColumnReader::ReadValue(MemPool* pool, Tuple* tuple, bool* read_row)
|
||||
}
|
||||
*read_row = true;
|
||||
} else {
|
||||
SetDescendantsNextLevelsConsumed(false);
|
||||
if (!HasNullCollectionAncestor<IN_COLLECTION>()) {
|
||||
SetNullSlot(tuple);
|
||||
*read_row = true;
|
||||
|
||||
@@ -25,7 +25,8 @@
|
||||
|
||||
namespace impala {
|
||||
|
||||
/// Helper struct that represents a micro batch within 'ScratchTupleBatch'.
|
||||
/// Helper struct that represents a ['start', 'end'] range of rows that needs to be
|
||||
/// scanned within a 'ScratchTupleBatch'.
|
||||
struct ScratchMicroBatch {
|
||||
int start;
|
||||
int end;
|
||||
|
||||
@@ -477,7 +477,8 @@ error_codes = (
|
||||
|
||||
("JWT_VERIFY_FAILED", 154, "Error verifying JWT Token: $0."),
|
||||
|
||||
("PARQUET_ROWS_SKIPPING", 155, "Couldn't skip rows in column '$0' in file '$1'."),
|
||||
("PARQUET_ROWS_SKIPPING", 155, "Couldn't skip rows in column '$0' in file '$1'. "
|
||||
"Detail: $2"),
|
||||
|
||||
("QUERY_OPTION_PARSE_FAILED", 156, "Failed to parse query option '$0': $1"),
|
||||
|
||||
|
||||
@@ -811,9 +811,8 @@ public class SelectStmt extends QueryStmt {
|
||||
// Do not generate a predicate if the parent tuple is outer joined.
|
||||
if (analyzer_.isOuterJoined(ref.getResolvedPath().getRootDesc().getId()))
|
||||
continue;
|
||||
// Don't push down the "is not empty" predicate for zipping unnests if there are
|
||||
// multiple zipping unnests in the FROM clause.
|
||||
if (tblRef.isZippingUnnest() && analyzer_.getNumZippingUnnests() > 1) {
|
||||
// Don't push down the "is not empty" predicate for zipping unnests.
|
||||
if (tblRef.isZippingUnnest()) {
|
||||
continue;
|
||||
}
|
||||
IsNotEmptyPredicate isNotEmptyPred =
|
||||
|
||||
@@ -20,7 +20,6 @@ PLAN-ROOT SINK
|
||||
|
|
||||
00:SCAN HDFS [functional_parquet.complextypes_arrays]
|
||||
HDFS partitions=1/1 files=1 size=1.06KB
|
||||
predicates: !empty(functional_parquet.complextypes_arrays.arr1)
|
||||
predicates on arr1: UNNEST(arr1) < 5, arr1.item < 5
|
||||
row-size=16B cardinality=1.35K
|
||||
====
|
||||
@@ -46,7 +45,6 @@ PLAN-ROOT SINK
|
||||
|
|
||||
00:SCAN HDFS [functional_parquet.complextypes_arrays a]
|
||||
HDFS partitions=1/1 files=1 size=1.06KB
|
||||
predicates: !empty(a.arr1)
|
||||
predicates on a.arr1: arr1.item < 5
|
||||
row-size=16B cardinality=1.35K
|
||||
====
|
||||
@@ -72,7 +70,6 @@ PLAN-ROOT SINK
|
||||
|
|
||||
00:SCAN HDFS [functional_parquet.complextypes_arrays]
|
||||
HDFS partitions=1/1 files=1 size=1.06KB
|
||||
predicates: !empty(arr1)
|
||||
predicates on functional_parquet.complextypes_arrays.arr1: UNNEST(functional_parquet.complextypes_arrays.arr1) < 5, functional_parquet.complextypes_arrays.arr1.item < 5
|
||||
row-size=16B cardinality=1.35K
|
||||
====
|
||||
@@ -98,7 +95,6 @@ PLAN-ROOT SINK
|
||||
|
|
||||
00:SCAN HDFS [functional_parquet.complextypes_arrays]
|
||||
HDFS partitions=1/1 files=1 size=1.06KB
|
||||
predicates: !empty(arr1)
|
||||
predicates on functional_parquet.complextypes_arrays.arr1: functional_parquet.complextypes_arrays.arr1.item < 5
|
||||
row-size=16B cardinality=1.35K
|
||||
====
|
||||
@@ -315,7 +311,6 @@ PLAN-ROOT SINK
|
||||
|
|
||||
00:SCAN HDFS [functional_parquet.complextypes_arrays]
|
||||
HDFS partitions=1/1 files=1 size=1.06KB
|
||||
predicates: !empty(functional_parquet.complextypes_arrays.arr1)
|
||||
row-size=16B cardinality=1.35K
|
||||
====
|
||||
select id, unnest(arr1), row_number() over (order by id, unnest(arr1))
|
||||
@@ -347,7 +342,6 @@ PLAN-ROOT SINK
|
||||
|
|
||||
00:SCAN HDFS [functional_parquet.complextypes_arrays]
|
||||
HDFS partitions=1/1 files=1 size=1.06KB
|
||||
predicates: !empty(functional_parquet.complextypes_arrays.arr1)
|
||||
row-size=16B cardinality=1.35K
|
||||
====
|
||||
select id, item1, item2, row_number() over (order by id, item1, item2)
|
||||
|
||||
@@ -12,4 +12,31 @@ select i, j from late_mat where j = 0;
|
||||
0,0
|
||||
---- TYPES
|
||||
INT, INT
|
||||
---- RUNTIME_PROFILE
|
||||
aggregation(SUM, NumTopLevelValuesSkipped): 3
|
||||
====
|
||||
---- QUERY
|
||||
# Test NumTopLevelValuesSkipped counter when there is more than one page in the
|
||||
# column.
|
||||
# Table `decimals_1_10` contains 60 rows, in which 8 rows are selected by the
|
||||
# query and 44 + 8 == 52 rows are skipped.
|
||||
set PARQUET_LATE_MATERIALIZATION_THRESHOLD=1;
|
||||
set parquet_read_page_index = false;
|
||||
select count(d_10) from decimals_1_10 where d_1 = 1;
|
||||
---- RESULTS
|
||||
8
|
||||
---- RUNTIME_PROFILE
|
||||
aggregation(SUM, NumPagesSkippedByLateMaterialization): 44
|
||||
aggregation(SUM, NumTopLevelValuesSkipped): 8
|
||||
====
|
||||
---- QUERY
|
||||
# Test if PARQUET_LATE_MATERIALIZATION_THRESHOLD is always 1 if there is any
|
||||
# collection that can be skipped.
|
||||
set parquet_read_page_index = false;
|
||||
select count(unnest(arr)) from nested_decimals n where d_38 = 1;
|
||||
---- RESULTS
|
||||
4
|
||||
---- RUNTIME_PROFILE
|
||||
aggregation(SUM, NumPagesSkippedByLateMaterialization): 0
|
||||
aggregation(SUM, NumTopLevelValuesSkipped): 17
|
||||
====
|
||||
|
||||
@@ -38,8 +38,41 @@ row_regex:.* RF00.\[min_max\] -. .\.l_orderkey.*
|
||||
aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
|
||||
====
|
||||
---- QUERY
|
||||
# Test that late materialization on nested columns is disabled.
|
||||
select * from tpch_nested_parquet.customer where c_mktsegment = 'COMEDY';
|
||||
# Test late materialization for query with one zipping unnest.
|
||||
select unnest(arr1)
|
||||
from functional_parquet.complextypes_arrays
|
||||
where id = 2;
|
||||
---- RUNTIME_PROFILE
|
||||
aggregation(SUM, NumPagesSkippedByLateMaterialization): 0
|
||||
aggregation(SUM, NumTopLevelValuesSkipped): 10
|
||||
====
|
||||
---- QUERY
|
||||
# Test late materialization for query with multiple zipping unnests.
|
||||
select unnest(arr1), unnest(arr2)
|
||||
from functional_parquet.complextypes_arrays
|
||||
where id = 2;
|
||||
---- RUNTIME_PROFILE
|
||||
aggregation(SUM, NumTopLevelValuesSkipped): 20
|
||||
====
|
||||
---- QUERY
|
||||
# Test if late materialization for collections works with page filtering.
|
||||
# In table tpch_nested_parquet.customer, min(c_phone) is '10-100-106-1617'.
|
||||
select count(o_orderkey) > 0
|
||||
from tpch_nested_parquet.customer c left outer join c.c_orders
|
||||
where c_phone < '10-100-106-16170'
|
||||
---- RESULTS
|
||||
true
|
||||
---- RUNTIME_PROFILE
|
||||
aggregation(SUM, NumDictFilteredRowGroups): 0
|
||||
aggregation(SUM, NumPagesSkippedByLateMaterialization)> 0
|
||||
aggregation(SUM, NumTopLevelValuesSkipped)> 0
|
||||
====
|
||||
---- QUERY
|
||||
# Test if PARQUET_LATE_MATERIALIZATION_THRESHOLD is always 1 if there is any
|
||||
# collection that can be skipped.
|
||||
set parquet_read_page_index = false;
|
||||
set expand_complex_types = true;
|
||||
select int_array_array
|
||||
from functional_parquet.complextypestbl where id % 2 = 0;
|
||||
---- RUNTIME_PROFILE
|
||||
aggregation(SUM, NumTopLevelValuesSkipped): 4
|
||||
====
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
from tests.common.file_utils import create_table_from_parquet
|
||||
|
||||
|
||||
class TestParquetLateMaterialization(ImpalaTestSuite):
|
||||
@@ -34,5 +35,7 @@ class TestParquetLateMaterialization(ImpalaTestSuite):
|
||||
self.run_test_case('QueryTest/parquet-late-materialization', vector)
|
||||
|
||||
def test_parquet_late_materialization_unique_db(self, vector, unique_database):
|
||||
create_table_from_parquet(self.client, unique_database, 'decimals_1_10')
|
||||
create_table_from_parquet(self.client, unique_database, 'nested_decimals')
|
||||
self.run_test_case('QueryTest/parquet-late-materialization-unique-db', vector,
|
||||
unique_database)
|
||||
|
||||
Reference in New Issue
Block a user