diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 268904524..63298cb8b 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -18,7 +18,6 @@ #include "exec/hdfs-parquet-scanner.h" #include // for std::numeric_limits -#include #include #include @@ -374,7 +373,7 @@ static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group, } int HdfsParquetScanner::CountScalarColumns(const vector& column_readers) { - DCHECK(!column_readers.empty()); + DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star()); int num_columns = 0; stack readers; for (ParquetColumnReader* r: column_readers_) readers.push(r); @@ -425,9 +424,39 @@ Status HdfsParquetScanner::ProcessSplit() { } Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { - if (scan_node_->IsZeroSlotTableScan()) { - // There are no materialized slots, e.g. count(*) over the table. We can serve - // this query from just the file metadata. We don't need to read the column data. + if (scan_node_->optimize_parquet_count_star()) { + // Populate the single slot with the Parquet num rows statistic. + int64_t tuple_buf_size; + uint8_t* tuple_buf; + // We try to allocate a smaller row batch here because in most cases the number row + // groups in a file is much lower than the default row batch capacity. + int capacity = min( + static_cast(file_metadata_.row_groups.size()), row_batch->capacity()); + RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_, + row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), + &capacity, &tuple_buf_size, &tuple_buf)); + while (!row_batch->AtCapacity()) { + RETURN_IF_ERROR(NextRowGroup()); + DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size()); + DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows); + if (row_group_idx_ == file_metadata_.row_groups.size()) break; + Tuple* dst_tuple = reinterpret_cast(tuple_buf); + TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow()); + InitTuple(template_tuple_, dst_tuple); + int64_t* dst_slot = reinterpret_cast(dst_tuple->GetSlot( + scan_node_->parquet_count_star_slot_offset())); + *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows; + row_group_rows_read_ += *dst_slot; + dst_row->SetTuple(0, dst_tuple); + row_batch->CommitLastRow(); + tuple_buf += scan_node_->tuple_desc()->byte_size(); + } + eos_ = row_group_idx_ == file_metadata_.row_groups.size(); + return Status::OK(); + } else if (scan_node_->IsZeroSlotTableScan()) { + // There are no materialized slots and we are not optimizing count(*), e.g. + // "select 1 from alltypes". We can serve this query from just the file metadata. + // We don't need to read the column data. if (row_group_rows_read_ == file_metadata_.num_rows) { eos_ = true; return Status::OK(); @@ -466,7 +495,8 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); } RETURN_IF_ERROR(NextRowGroup()); - if (row_group_idx_ >= file_metadata_.row_groups.size()) { + DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size()); + if (row_group_idx_ == file_metadata_.row_groups.size()) { eos_ = true; DCHECK(parse_status_.ok()); return Status::OK(); @@ -540,7 +570,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts( } int col_idx = node->col_idx; - DCHECK(col_idx < row_group.columns.size()); + DCHECK_LT(col_idx, row_group.columns.size()); const vector& col_orders = file_metadata.column_orders; const parquet::ColumnOrder* col_order = nullptr; @@ -1422,6 +1452,12 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc DCHECK(column_readers != NULL); DCHECK(column_readers->empty()); + if (scan_node_->optimize_parquet_count_star()) { + // Column readers are not needed because we are not reading from any columns if this + // optimization is enabled. + return Status::OK(); + } + // Each tuple can have at most one position slot. We'll process this slot desc last. SlotDescriptor* pos_slot_desc = NULL; @@ -1605,7 +1641,8 @@ Status HdfsParquetScanner::InitColumns( int64_t col_end = col_start + col_len; // Already validated in ValidateColumnOffsets() - DCHECK(col_end > 0 && col_end < file_desc->file_length); + DCHECK_GT(col_end, 0); + DCHECK_LT(col_end, file_desc->file_length); if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) { // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the // dictionary page header size in total_compressed_size and total_uncompressed_size diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 469c00e3c..0e5e9745f 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -88,6 +88,11 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ? tnode.hdfs_scan_node.skip_header_line_count : 0), tuple_id_(tnode.hdfs_scan_node.tuple_id), + optimize_parquet_count_star_( + tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset), + parquet_count_star_slot_offset_( + tnode.hdfs_scan_node.__isset.parquet_count_star_slot_offset ? + tnode.hdfs_scan_node.parquet_count_star_slot_offset : -1), tuple_desc_(descs.GetTupleDescriptor(tuple_id_)), thrift_dict_filter_conjuncts_map_( tnode.hdfs_scan_node.__isset.dictionary_filter_conjuncts ? diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 11e5718ec..f71f5b4f1 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -154,6 +154,8 @@ class HdfsScanNodeBase : public ScanNode { RuntimeState* runtime_state() { return runtime_state_; } int skip_header_line_count() const { return skip_header_line_count_; } DiskIoRequestContext* reader_context() { return reader_context_; } + bool optimize_parquet_count_star() const { return optimize_parquet_count_star_; } + bool parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; } typedef std::unordered_map> ConjunctEvaluatorsMap; @@ -324,6 +326,15 @@ class HdfsScanNodeBase : public ScanNode { /// Tuple id resolved in Prepare() to set tuple_desc_ const int tuple_id_; + /// Set to true when this scan node can optimize a count(*) query by populating the + /// tuple with data from the Parquet num rows statistic. See + /// applyParquetCountStartOptimization() in HdfsScanNode.java. + const bool optimize_parquet_count_star_; + + // The byte offset of the slot for Parquet metadata if Parquet count star optimization + // is enabled. + const int parquet_count_star_slot_offset_; + /// RequestContext object to use with the disk-io-mgr for reads. DiskIoRequestContext* reader_context_ = nullptr; diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 6299b9eb0..e5e7f24f4 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -216,6 +216,10 @@ struct THdfsScanNode { // Map from SlotIds to the indices in TPlanNode.conjuncts that are eligible // for dictionary filtering. 9: optional map> dictionary_filter_conjuncts + + // The byte offset of the slot for Parquet metadata if Parquet count star optimization + // is enabled. + 10: optional i32 parquet_count_star_slot_offset } struct TDataSourceScanNode { diff --git a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java index 4f8b4fcee..3b0ad9cfd 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java +++ b/fe/src/main/java/org/apache/impala/analysis/AggregateInfo.java @@ -651,6 +651,17 @@ public class AggregateInfo extends AggregateInfoBase { return true; } + /** + * Returns true if there is a single count(*) materialized aggregate expression. + */ + public boolean hasCountStarOnly() { + if (getMaterializedAggregateExprs().size() != 1) return false; + if (isDistinctAgg()) return false; + FunctionCallExpr origExpr = getMaterializedAggregateExprs().get(0); + if (!origExpr.getFnName().getFunction().equalsIgnoreCase("count")) return false; + return origExpr.getParams().isStar(); + } + /** * Validates the internal state of this agg info: Checks that the number of * materialized slots of the output tuple corresponds to the number of materialized diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 3527b85bd..33fbb8b3d 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -60,11 +60,12 @@ import org.apache.impala.common.RuntimeEnv; import org.apache.impala.planner.PlanNode; import org.apache.impala.rewrite.BetweenToCompoundRule; import org.apache.impala.rewrite.EqualityDisjunctsToInRule; -import org.apache.impala.rewrite.ExprRewriter; import org.apache.impala.rewrite.ExprRewriteRule; +import org.apache.impala.rewrite.ExprRewriter; import org.apache.impala.rewrite.ExtractCommonConjunctRule; import org.apache.impala.rewrite.FoldConstantsRule; import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule; +import org.apache.impala.rewrite.NormalizeCountStarRule; import org.apache.impala.rewrite.NormalizeExprsRule; import org.apache.impala.rewrite.SimplifyConditionalsRule; import org.apache.impala.service.FeSupport; @@ -343,6 +344,7 @@ public class Analyzer { // Relies on FoldConstantsRule and NormalizeExprsRule. rules.add(SimplifyConditionalsRule.INSTANCE); rules.add(EqualityDisjunctsToInRule.INSTANCE); + rules.add(NormalizeCountStarRule.INSTANCE); } exprRewriter_ = new ExprRewriter(rules); } @@ -602,7 +604,6 @@ public class Analyzer { Preconditions.checkNotNull(resolvedPath); if (resolvedPath.destTable() != null) { Table table = resolvedPath.destTable(); - Preconditions.checkNotNull(table); if (table instanceof View) return new InlineViewRef((View) table, tableRef); // The table must be a base table. Preconditions.checkState(table instanceof HdfsTable || @@ -695,6 +696,7 @@ public class Analyzer { return globalState_.descTbl.getSlotDesc(id); } + public int getNumTableRefs() { return tableRefMap_.size(); } public TableRef getTableRef(TupleId tid) { return tableRefMap_.get(tid); } public ExprRewriter getConstantFolder() { return globalState_.constantFolder_; } public ExprRewriter getExprRewriter() { return globalState_.exprRewriter_; } diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java index 8538492e3..f263cd51e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java +++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java @@ -51,7 +51,7 @@ public class FunctionCallExpr extends Expr { // feeding into this Merge(). This is stored so that we can access the types of the // original input argument exprs. Note that the nullness affects the behaviour of // resetAnalysisState(), which is used during expr substitution. - private final FunctionCallExpr mergeAggInputFn_; + private FunctionCallExpr mergeAggInputFn_; // Printed in toSqlImpl(), if set. Used for merge agg fns. private String label_; @@ -568,6 +568,9 @@ public class FunctionCallExpr extends Expr { if (hasChildCosts()) evalCost_ = getChildCosts() + FUNCTION_CALL_COST; } + public FunctionCallExpr getMergeAggInputFn() { return mergeAggInputFn_; } + public void setMergeAggInputFn(FunctionCallExpr fn) { mergeAggInputFn_ = fn; } + /** * Checks that no special aggregate params are included in 'params' that would be * invalid for a scalar function. Analysis of the param exprs is not done. @@ -604,4 +607,21 @@ public class FunctionCallExpr extends Expr { @Override public Expr clone() { return new FunctionCallExpr(this); } + + @Override + protected Expr substituteImpl(ExprSubstitutionMap smap, Analyzer analyzer) + throws AnalysisException { + Expr e = super.substituteImpl(smap, analyzer); + if (!(e instanceof FunctionCallExpr)) return e; + FunctionCallExpr fn = (FunctionCallExpr) e; + FunctionCallExpr mergeFn = fn.getMergeAggInputFn(); + if (mergeFn != null) { + // The merge function needs to be substituted as well. + Expr substitutedFn = mergeFn.substitute(smap, analyzer, true); + Preconditions.checkState(substitutedFn instanceof FunctionCallExpr); + fn.setMergeAggInputFn((FunctionCallExpr) substitutedFn); + } + return e; + } + } diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java b/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java index c0a78c701..32e23d75a 100644 --- a/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java +++ b/fe/src/main/java/org/apache/impala/analysis/FunctionParams.java @@ -23,7 +23,7 @@ import java.util.List; * Return value of the grammar production that parses function * parameters. These parameters can be for scalar or aggregate functions. */ -class FunctionParams implements Cloneable { +public class FunctionParams implements Cloneable { private final boolean isStar_; private boolean isDistinct_; private boolean isIgnoreNulls_; diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java index bc7da2b1c..a87cd3adc 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java +++ b/fe/src/main/java/org/apache/impala/analysis/TupleDescriptor.java @@ -339,7 +339,7 @@ public class TupleDescriptor { for (SlotDescriptor slotDesc: getSlots()) { if (!slotDesc.isMaterialized()) continue; if (slotDesc.getColumn() == null || - slotDesc.getColumn().getPosition() >= hdfsTable.getNumClusteringCols()) { + !hdfsTable.isClusteringColumn(slotDesc.getColumn())) { return false; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java index 93f859d8f..58f5d1568 100644 --- a/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java +++ b/fe/src/main/java/org/apache/impala/catalog/BuiltinsDb.java @@ -880,6 +880,16 @@ public class BuiltinsDb extends Db { prefix + "16SumDecimalRemoveEPN10impala_udf15FunctionContextERKNS1_10DecimalValEPS4_", null, false, true, false)); + // Sum that returns zero on an empty input. + db.addBuiltin(AggregateFunction.createBuiltin(db, "sum_init_zero", + Lists.newArrayList(Type.BIGINT), Type.BIGINT, Type.BIGINT, + prefix + "8InitZeroIN10impala_udf9BigIntValEEEvPNS2_15FunctionContextEPT_", + prefix + "9SumUpdateIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_", + prefix + "9SumUpdateIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_", + null, null, + prefix + "9SumRemoveIN10impala_udf9BigIntValES3_EEvPNS2_15FunctionContextERKT_PT0_", + null, false, true, true)); + // Avg // TODO: switch to CHAR(sizeof(AvgIntermediateType) when that becomes available db.addBuiltin(AggregateFunction.createBuiltin(db, "avg", diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index a08f0e824..fa9038a67 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -27,10 +27,15 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.impala.analysis.AggregateInfo; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.DescriptorTable; import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.ExprSubstitutionMap; +import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.analysis.FunctionName; +import org.apache.impala.analysis.FunctionParams; import org.apache.impala.analysis.InPredicate; import org.apache.impala.analysis.LiteralExpr; import org.apache.impala.analysis.NullLiteral; @@ -43,9 +48,9 @@ import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.HdfsFileFormat; -import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsPartition.FileBlock; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.Type; import org.apache.impala.common.FileSystemUtil; @@ -97,6 +102,13 @@ import com.google.common.collect.Sets; * parquet::Statistics of row groups. If the conjuncts don't match, then whole row groups * will be skipped. * + * Count(*) aggregation optimization flow: + * The caller passes in an AggregateInfo to the constructor that this scan node uses to + * determine whether to apply the optimization or not. The produced smap must then be + * applied to the AggregateInfo in this query block. We do not apply the smap in this + * class directly to avoid side effects and make it easier to reason about. + * See HdfsScanNode.applyParquetCountStartOptimization(). + * * TODO: pass in range restrictions. */ public class HdfsScanNode extends ScanNode { @@ -126,6 +138,10 @@ public class HdfsScanNode extends ScanNode { private final TReplicaPreference replicaPreference_; private final boolean randomReplica_; + // The AggregationInfo from the query block of this scan node. Used for determining if + // the Parquet count(*) optimization can be applied. + private final AggregateInfo aggInfo_; + // Number of partitions, files and bytes scanned. Set in computeScanRangeLocations(). // Might not match 'partitions_' due to table sampling. private int numPartitions_ = 0; @@ -140,6 +156,11 @@ public class HdfsScanNode extends ScanNode { // True if this scan node should use the MT implementation in the backend. private boolean useMtScanNode_; + // Should be applied to the AggregateInfo from the same query block. We cannot use the + // PlanNode.outputSmap_ for this purpose because we don't want the smap entries to be + // propagated outside the query block. + protected ExprSubstitutionMap optimizedAggSmap_; + // Conjuncts that can be evaluated while materializing the items (tuples) of // collection-typed slots. Maps from tuple descriptor to the conjuncts bound by that // tuple. Uses a linked hash map for consistent display in explain. @@ -182,6 +203,10 @@ public class HdfsScanNode extends ScanNode { // parquet::Statistics. private TupleDescriptor minMaxTuple_; + // Slot that is used to record the Parquet metatdata for the count(*) aggregation if + // this scan node has the count(*) optimization enabled. + private SlotDescriptor countStarSlot_ = null; + /** * Construct a node to scan given data files into tuples described by 'desc', * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and @@ -189,7 +214,7 @@ public class HdfsScanNode extends ScanNode { * class comments above for details. */ public HdfsScanNode(PlanNodeId id, TupleDescriptor desc, List conjuncts, - List partitions, TableRef hdfsTblRef) { + List partitions, TableRef hdfsTblRef, AggregateInfo aggInfo) { super(id, desc, "SCAN HDFS"); Preconditions.checkState(desc.getTable() instanceof HdfsTable); tbl_ = (HdfsTable)desc.getTable(); @@ -201,6 +226,7 @@ public class HdfsScanNode extends ScanNode { HdfsTable hdfsTable = (HdfsTable)hdfsTblRef.getTable(); Preconditions.checkState(tbl_ == hdfsTable); StringBuilder error = new StringBuilder(); + aggInfo_ = aggInfo; skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error); if (error.length() > 0) { // Any errors should already have been caught during analysis. @@ -217,6 +243,47 @@ public class HdfsScanNode extends ScanNode { return helper.addValue(super.debugString()).toString(); } + /** + * Adds a new slot descriptor to the tuple descriptor of this scan. The new slot will be + * used for storing the data extracted from the Parquet num rows statistic. Also adds an + * entry to 'optimizedAggSmap_' that substitutes count(*) with + * sum_init_zero(). Returns the new slot descriptor. + */ + private SlotDescriptor applyParquetCountStartOptimization(Analyzer analyzer) { + FunctionCallExpr countFn = new FunctionCallExpr(new FunctionName("count"), + FunctionParams.createStarParam()); + countFn.analyzeNoThrow(analyzer); + + // Create the sum function. + SlotDescriptor sd = analyzer.addSlotDescriptor(getTupleDesc()); + sd.setType(Type.BIGINT); + sd.setIsMaterialized(true); + sd.setIsNullable(false); + sd.setLabel("parquet-stats: num_rows"); + ArrayList args = Lists.newArrayList(); + args.add(new SlotRef(sd)); + FunctionCallExpr sumFn = new FunctionCallExpr("sum_init_zero", args); + sumFn.analyzeNoThrow(analyzer); + + optimizedAggSmap_ = new ExprSubstitutionMap(); + optimizedAggSmap_.put(countFn, sumFn); + return sd; + } + + /** + * Returns true if the Parquet count(*) optimization can be applied to the query block + * of this scan node. + */ + private boolean canApplyParquetCountStarOptimization(Analyzer analyzer, + Set fileFormats) { + if (analyzer.getNumTableRefs() != 1) return false; + if (aggInfo_ == null || !aggInfo_.hasCountStarOnly()) return false; + if (fileFormats.size() != 1) return false; + if (!fileFormats.contains(HdfsFileFormat.PARQUET)) return false; + if (!conjuncts_.isEmpty()) return false; + return desc_.getMaterializedSlots().isEmpty() || desc_.hasClusteringColsOnly(); + } + /** * Populate collectionConjuncts_ and scanRanges_. */ @@ -227,7 +294,6 @@ public class HdfsScanNode extends ScanNode { assignCollectionConjuncts(analyzer); computeDictionaryFilterConjuncts(analyzer); - computeMemLayout(analyzer); // compute scan range locations with optional sampling Set fileFormats = computeScanRangeLocations(analyzer); @@ -248,7 +314,16 @@ public class HdfsScanNode extends ScanNode { computeMinMaxTupleAndConjuncts(analyzer); } - // do this at the end so it can take all conjuncts and scan ranges into account + if (canApplyParquetCountStarOptimization(analyzer, fileFormats)) { + Preconditions.checkState(desc_.getPath().destTable() != null); + Preconditions.checkState(collectionConjuncts_.isEmpty()); + countStarSlot_ = applyParquetCountStartOptimization(analyzer); + } + + computeMemLayout(analyzer); + + // This is towards the end, so that it can take all conjuncts, scan ranges and mem + // layout into account. computeStats(analyzer); // TODO: do we need this? @@ -310,10 +385,6 @@ public class HdfsScanNode extends ScanNode { } } - public boolean isPartitionedTable() { - return desc_.getTable().getNumClusteringCols() > 0; - } - /** * Populates the collection conjuncts, materializes their required slots, and marks * the conjuncts as assigned, if it is correct to do so. Some conjuncts may have to @@ -855,6 +926,11 @@ public class HdfsScanNode extends ScanNode { msg.hdfs_scan_node.setSkip_header_line_count(skipHeaderLineCount_); } msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_); + Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null)); + if (countStarSlot_ != null) { + msg.hdfs_scan_node.setParquet_count_star_slot_offset( + countStarSlot_.getByteOffset()); + } if (!minMaxConjuncts_.isEmpty()) { for (Expr e: minMaxConjuncts_) { msg.hdfs_scan_node.addToMin_max_conjuncts(e.treeToThrift()); @@ -1016,6 +1092,8 @@ public class HdfsScanNode extends ScanNode { MAX_IO_BUFFERS_PER_THREAD * BackendConfig.INSTANCE.getReadSize(); } + public ExprSubstitutionMap getOptimizedAggSmap() { return optimizedAggSmap_; } + @Override public boolean isTableMissingTableStats() { if (extrapolatedNumRows_ >= 0) return false; diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java index 0b36922df..8448da59d 100644 --- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java @@ -93,7 +93,8 @@ abstract public class PlanNode extends TreeNode { // assigned to a fragment. Set and maintained by enclosing PlanFragment. protected PlanFragment fragment_; - // if set, needs to be applied by parent node to reference this node's output + // If set, needs to be applied by parent node to reference this node's output. The + // entries need to be propagated all the way to the root node. protected ExprSubstitutionMap outputSmap_; // global state of planning wrt conjunct assignment; used by planner as a shortcut @@ -206,7 +207,6 @@ abstract public class PlanNode extends TreeNode { /** * Set the limit_ to the given limit_ only if the limit_ hasn't been set, or the new limit_ * is lower. - * @param limit_ */ public void setLimit(long limit) { if (limit_ == -1 || (limit != -1 && limit_ > limit)) limit_ = limit; diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java index b85532fcd..1373e8921 100644 --- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java @@ -168,7 +168,7 @@ abstract public class ScanNode extends PlanNode { public boolean isTableMissingColumnStats() { for (SlotDescriptor slot: desc_.getSlots()) { - if (!slot.getStats().hasStats()) return true; + if (slot.getColumn() != null && !slot.getStats().hasStats()) return true; } return false; } diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 62c045ba6..3e0692b50 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -31,12 +31,13 @@ import org.apache.impala.analysis.AggregateInfo; import org.apache.impala.analysis.AnalyticInfo; import org.apache.impala.analysis.Analyzer; import org.apache.impala.analysis.BaseTableRef; -import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.BinaryPredicate.Operator; +import org.apache.impala.analysis.BinaryPredicate; import org.apache.impala.analysis.CollectionTableRef; import org.apache.impala.analysis.Expr; import org.apache.impala.analysis.ExprId; import org.apache.impala.analysis.ExprSubstitutionMap; +import org.apache.impala.analysis.FunctionCallExpr; import org.apache.impala.analysis.InlineViewRef; import org.apache.impala.analysis.JoinOperator; import org.apache.impala.analysis.NullLiteral; @@ -51,8 +52,8 @@ import org.apache.impala.analysis.TableSampleClause; import org.apache.impala.analysis.TupleDescriptor; import org.apache.impala.analysis.TupleId; import org.apache.impala.analysis.TupleIsNullPredicate; -import org.apache.impala.analysis.UnionStmt; import org.apache.impala.analysis.UnionStmt.UnionOperand; +import org.apache.impala.analysis.UnionStmt; import org.apache.impala.catalog.ColumnStats; import org.apache.impala.catalog.DataSourceTable; import org.apache.impala.catalog.HBaseTable; @@ -601,24 +602,22 @@ public class SingleNodePlanner { return createAggregationPlan(selectStmt, analyzer, emptySetNode); } - AggregateInfo aggInfo = selectStmt.getAggInfo(); - // For queries which contain partition columns only, we may use the metadata instead - // of table scans. This is only feasible if all materialized aggregate expressions - // have distinct semantics. Please see createHdfsScanPlan() for details. - boolean fastPartitionKeyScans = - analyzer.getQueryCtx().client_request.query_options.optimize_partition_key_scans && - aggInfo != null && aggInfo.hasAllDistinctAgg(); - // Separate table refs into parent refs (uncorrelated or absolute) and // subplan refs (correlated or relative), and generate their plan. List parentRefs = Lists.newArrayList(); List subplanRefs = Lists.newArrayList(); computeParentAndSubplanRefs( selectStmt.getTableRefs(), analyzer.isStraightJoin(), parentRefs, subplanRefs); - PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, fastPartitionKeyScans, - analyzer); + AggregateInfo aggInfo = selectStmt.getAggInfo(); + PlanNode root = createTableRefsPlan(parentRefs, subplanRefs, aggInfo, analyzer); // add aggregation, if any - if (aggInfo != null) root = createAggregationPlan(selectStmt, analyzer, root); + if (aggInfo != null) { + if (root instanceof HdfsScanNode) { + aggInfo.substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); + aggInfo.getMergeAggInfo().substitute(((HdfsScanNode) root).getOptimizedAggSmap(), analyzer); + } + root = createAggregationPlan(selectStmt, analyzer, root); + } // All the conjuncts_ should be assigned at this point. // TODO: Re-enable this check here and/or elswehere. @@ -763,19 +762,16 @@ public class SingleNodePlanner { /** * Returns a plan tree for evaluating the given parentRefs and subplanRefs. - * - * 'fastPartitionKeyScans' indicates whether to try to produce slots with - * metadata instead of table scans. */ private PlanNode createTableRefsPlan(List parentRefs, - List subplanRefs, boolean fastPartitionKeyScans, - Analyzer analyzer) throws ImpalaException { + List subplanRefs, AggregateInfo aggInfo, Analyzer analyzer) + throws ImpalaException { // create plans for our table refs; use a list here instead of a map to // maintain a deterministic order of traversing the TableRefs during join // plan generation (helps with tests) List> parentRefPlans = Lists.newArrayList(); for (TableRef ref: parentRefs) { - PlanNode root = createTableRefNode(ref, fastPartitionKeyScans, analyzer); + PlanNode root = createTableRefNode(ref, aggInfo, analyzer); Preconditions.checkNotNull(root); root = createSubplan(root, subplanRefs, true, analyzer); parentRefPlans.add(new Pair(ref, root)); @@ -845,7 +841,7 @@ public class SingleNodePlanner { // their containing SubplanNode. Also, further plan generation relies on knowing // whether we are in a subplan context or not (see computeParentAndSubplanRefs()). ctx_.pushSubplan(subplanNode); - PlanNode subplan = createTableRefsPlan(applicableRefs, subplanRefs, false, analyzer); + PlanNode subplan = createTableRefsPlan(applicableRefs, subplanRefs, null, analyzer); ctx_.popSubplan(); subplanNode.setSubplan(subplan); subplanNode.init(analyzer); @@ -1194,11 +1190,10 @@ public class SingleNodePlanner { /** * Create a node to materialize the slots in the given HdfsTblRef. * - * If 'hdfsTblRef' only contains partition columns and 'fastPartitionKeyScans' - * is true, the slots may be produced directly in this function using the metadata. - * Otherwise, a HdfsScanNode will be created. + * The given 'aggInfo' is used for detecting and applying optimizations that span both + * the scan and aggregation. */ - private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, boolean fastPartitionKeyScans, + private PlanNode createHdfsScanPlan(TableRef hdfsTblRef, AggregateInfo aggInfo, List conjuncts, Analyzer analyzer) throws ImpalaException { TupleDescriptor tupleDesc = hdfsTblRef.getDesc(); @@ -1210,6 +1205,13 @@ public class SingleNodePlanner { // Mark all slots referenced by the remaining conjuncts as materialized. analyzer.materializeSlots(conjuncts); + // For queries which contain partition columns only, we may use the metadata instead + // of table scans. This is only feasible if all materialized aggregate expressions + // have distinct semantics. Please see createHdfsScanPlan() for details. + boolean fastPartitionKeyScans = + analyzer.getQueryCtx().client_request.query_options.optimize_partition_key_scans && + aggInfo != null && aggInfo.hasAllDistinctAgg(); + // If the optimization for partition key scans with metadata is enabled, // try evaluating with metadata first. If not, fall back to scanning. if (fastPartitionKeyScans && tupleDesc.hasClusteringColsOnly()) { @@ -1245,7 +1247,7 @@ public class SingleNodePlanner { } else { ScanNode scanNode = new HdfsScanNode(ctx_.getNextNodeId(), tupleDesc, conjuncts, partitions, - hdfsTblRef); + hdfsTblRef, aggInfo); scanNode.init(analyzer); return scanNode; } @@ -1254,13 +1256,13 @@ public class SingleNodePlanner { /** * Create node for scanning all data files of a particular table. * - * 'fastPartitionKeyScans' indicates whether to try to produce the slots with - * metadata instead of table scans. Only applicable to HDFS tables. + * The given 'aggInfo' is used for detecting and applying optimizations that span both + * the scan and aggregation. Only applicable to HDFS table refs. * * Throws if a PlanNode.init() failed or if planning of the given * table ref is not implemented. */ - private PlanNode createScanNode(TableRef tblRef, boolean fastPartitionKeyScans, + private PlanNode createScanNode(TableRef tblRef, AggregateInfo aggInfo, Analyzer analyzer) throws ImpalaException { ScanNode scanNode = null; @@ -1289,9 +1291,10 @@ public class SingleNodePlanner { Table table = tblRef.getTable(); if (table instanceof HdfsTable) { - return createHdfsScanPlan(tblRef, fastPartitionKeyScans, conjuncts, analyzer); + return createHdfsScanPlan(tblRef, aggInfo, conjuncts, analyzer); } else if (table instanceof DataSourceTable) { - scanNode = new DataSourceScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), conjuncts); + scanNode = new DataSourceScanNode(ctx_.getNextNodeId(), tblRef.getDesc(), + conjuncts); scanNode.init(analyzer); return scanNode; } else if (table instanceof HBaseTable) { @@ -1496,18 +1499,17 @@ public class SingleNodePlanner { * Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef, * CollectionTableRef or an InlineViewRef. * - * 'fastPartitionKeyScans' indicates whether to try to produce the slots with - * metadata instead of table scans. Only applicable to BaseTableRef which is also - * an HDFS table. + * The given 'aggInfo' is used for detecting and applying optimizations that span both + * the scan and aggregation. Only applicable to HDFS table refs. * * Throws if a PlanNode.init() failed or if planning of the given * table ref is not implemented. */ - private PlanNode createTableRefNode(TableRef tblRef, boolean fastPartitionKeyScans, + private PlanNode createTableRefNode(TableRef tblRef, AggregateInfo aggInfo, Analyzer analyzer) throws ImpalaException { PlanNode result = null; if (tblRef instanceof BaseTableRef) { - result = createScanNode(tblRef, fastPartitionKeyScans, analyzer); + result = createScanNode(tblRef, aggInfo, analyzer); } else if (tblRef instanceof CollectionTableRef) { if (tblRef.isRelative()) { Preconditions.checkState(ctx_.hasSubplan()); @@ -1515,7 +1517,7 @@ public class SingleNodePlanner { (CollectionTableRef) tblRef); result.init(analyzer); } else { - result = createScanNode(tblRef, false, analyzer); + result = createScanNode(tblRef, null, analyzer); } } else if (tblRef instanceof InlineViewRef) { result = createInlineViewPlan(analyzer, (InlineViewRef) tblRef); diff --git a/fe/src/main/java/org/apache/impala/rewrite/NormalizeCountStarRule.java b/fe/src/main/java/org/apache/impala/rewrite/NormalizeCountStarRule.java new file mode 100644 index 000000000..90556c1ef --- /dev/null +++ b/fe/src/main/java/org/apache/impala/rewrite/NormalizeCountStarRule.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.rewrite; + +import org.apache.impala.analysis.Analyzer; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.FunctionCallExpr; +import org.apache.impala.analysis.FunctionName; +import org.apache.impala.analysis.FunctionParams; +import org.apache.impala.common.AnalysisException; + +/** + * Replaces count() with an equivalent count{*}. + * + * Examples: + * count(1) --> count(*) + * count(2017) --> count(*) + * count(null) --> count(null) + */ +public class NormalizeCountStarRule implements ExprRewriteRule { + public static ExprRewriteRule INSTANCE = new NormalizeCountStarRule(); + + @Override + public Expr apply(Expr expr, Analyzer analyzer) throws AnalysisException { + if (!(expr instanceof FunctionCallExpr)) return expr; + FunctionCallExpr origExpr = (FunctionCallExpr) expr; + if (!origExpr.getFnName().getFunction().equalsIgnoreCase("count")) return expr; + if (origExpr.getParams().isStar()) return expr; + if (origExpr.getParams().isDistinct()) return expr; + if (origExpr.getParams().exprs().size() != 1) return expr; + Expr child = origExpr.getChild(0); + if (!child.isLiteral()) return expr; + if (child.isNullLiteral()) return expr; + FunctionCallExpr result = new FunctionCallExpr(new FunctionName("count"), + FunctionParams.createStarParam()); + return result; + } + + private NormalizeCountStarRule() {} +} diff --git a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java index d20aedfad..e49c6525f 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ExprRewriteRulesTest.java @@ -24,13 +24,14 @@ import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.rewrite.BetweenToCompoundRule; import org.apache.impala.rewrite.EqualityDisjunctsToInRule; -import org.apache.impala.rewrite.SimplifyConditionalsRule; import org.apache.impala.rewrite.ExprRewriteRule; import org.apache.impala.rewrite.ExprRewriter; import org.apache.impala.rewrite.ExtractCommonConjunctRule; import org.apache.impala.rewrite.FoldConstantsRule; import org.apache.impala.rewrite.NormalizeBinaryPredicatesRule; +import org.apache.impala.rewrite.NormalizeCountStarRule; import org.apache.impala.rewrite.NormalizeExprsRule; +import org.apache.impala.rewrite.SimplifyConditionalsRule; import org.junit.Assert; import org.junit.Test; @@ -510,4 +511,18 @@ public class ExprRewriteRulesTest extends FrontendTestBase { + "(select smallint_col from functional.alltypessmall where smallint_col<10)", edToInrule, null); } + + @Test + public void TestNormalizeCountStarRule() throws AnalysisException { + ExprRewriteRule rule = NormalizeCountStarRule.INSTANCE; + + RewritesOk("count(1)", rule, "count(*)"); + RewritesOk("count(5)", rule, "count(*)"); + + // Verify that these don't get rewritten. + RewritesOk("count(null)", rule, null); + RewritesOk("count(id)", rule, null); + RewritesOk("count(1 + 1)", rule, null); + RewritesOk("count(1 + null)", rule, null); + } } diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index d33e678c5..4641d68d6 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -301,6 +301,9 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("conjunct-ordering"); } + @Test + public void testParquetStatsAgg() { runPlannerTestFile("parquet-stats-agg"); } + @Test public void testParquetFiltering() { TQueryOptions options = defaultQueryOptions(); diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test index ffabd6bc3..a707e44ac 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/disable-codegen.test @@ -41,7 +41,7 @@ PLAN-ROOT SINK select count(*) from functional_parquet.alltypes ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=0B -Per-Host Resource Estimates: Memory=20.00MB +Per-Host Resource Estimates: Memory=26.00MB WARNING: The following tables are missing relevant table and/or column statistics. functional_parquet.alltypes @@ -53,10 +53,10 @@ PLAN-ROOT SINK 02:EXCHANGE [UNPARTITIONED] | 01:AGGREGATE -| output: count(*) +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) | 00:SCAN HDFS [functional_parquet.alltypes] - partitions=24/24 files=24 size=178.13KB + partitions=24/24 files=24 size=179.68KB ==== # > 3000 rows returned to coordinator: codegen should be enabled select * from functional_parquet.alltypes @@ -71,7 +71,7 @@ PLAN-ROOT SINK 01:EXCHANGE [UNPARTITIONED] | 00:SCAN HDFS [functional_parquet.alltypes] - partitions=24/24 files=24 size=178.13KB + partitions=24/24 files=24 size=179.68KB ==== # Optimisation is enabled for join producing < 3000 rows select count(*) diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test index b5361a6db..0a41fb9ef 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/distinct.test @@ -445,11 +445,11 @@ where t.x + t.y > 10 and t.x > 0 and t.y > 1 PLAN-ROOT SINK | 02:AGGREGATE [FINALIZE] -| output: count(1), count:merge(1) -| having: count(1) > 0, zeroifnull(count(1)) > 1, count(1) + zeroifnull(count(1)) > 10 +| output: count(1), count:merge(*) +| having: count(1) > 0, zeroifnull(count(*)) > 1, count(1) + zeroifnull(count(*)) > 10 | 01:AGGREGATE -| output: count(1) +| output: count(*) | group by: 1 | 00:SCAN HDFS [functional.alltypes] @@ -458,22 +458,22 @@ PLAN-ROOT SINK PLAN-ROOT SINK | 06:AGGREGATE [FINALIZE] -| output: count:merge(1), count:merge(1) -| having: count(1) > 0, zeroifnull(count(1)) > 1, count(1) + zeroifnull(count(1)) > 10 +| output: count:merge(1), count:merge(*) +| having: count(1) > 0, zeroifnull(count(*)) > 1, count(1) + zeroifnull(count(*)) > 10 | 05:EXCHANGE [UNPARTITIONED] | 02:AGGREGATE -| output: count(1), count:merge(1) +| output: count(1), count:merge(*) | 04:AGGREGATE -| output: count:merge(1) +| output: count:merge(*) | group by: 1 | 03:EXCHANGE [HASH(1)] | 01:AGGREGATE [STREAMING] -| output: count(1) +| output: count(*) | group by: 1 | 00:SCAN HDFS [functional.alltypes] diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test index 08dbd4dc7..c4b45babb 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/join-order.test @@ -1458,7 +1458,7 @@ on (t3.string_col = t1.string_col_1 and t3.date_string_col = t1.string_col_2) PLAN-ROOT SINK | 05:AGGREGATE [FINALIZE] -| output: count(1) +| output: count(*) | 04:HASH JOIN [LEFT OUTER JOIN] | hash predicates: t2.string_col = t3.string_col, t1.string_col = t3.date_string_col diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test new file mode 100644 index 000000000..1afe61c15 --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test @@ -0,0 +1,349 @@ +# Verify that that the parquet count(*) optimization is applied in all count(*) or +# count() cases when scanning a Parquet table. In the last case, we are scanning +# a text table, so the optimization is not applied. +select count(*) from functional_parquet.alltypes +union all +select count(1) from functional_parquet.alltypes +union all +select count(123) from functional_parquet.alltypes +union all +select count(*) from functional.alltypes +---- PLAN +PLAN-ROOT SINK +| +00:UNION +| pass-through-operands: all +| +|--08:AGGREGATE [FINALIZE] +| | output: count(*) +| | +| 07:SCAN HDFS [functional.alltypes] +| partitions=24/24 files=24 size=478.45KB +| +|--06:AGGREGATE [FINALIZE] +| | output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| | +| 05:SCAN HDFS [functional_parquet.alltypes] +| partitions=24/24 files=24 size=178.13KB +| +|--04:AGGREGATE [FINALIZE] +| | output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| | +| 03:SCAN HDFS [functional_parquet.alltypes] +| partitions=24/24 files=24 size=178.13KB +| +02:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| +01:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +00:UNION +| pass-through-operands: all +| +|--16:AGGREGATE [FINALIZE] +| | output: count:merge(*) +| | +| 15:EXCHANGE [UNPARTITIONED] +| | +| 08:AGGREGATE +| | output: count(*) +| | +| 07:SCAN HDFS [functional.alltypes] +| partitions=24/24 files=24 size=478.45KB +| +|--14:AGGREGATE [FINALIZE] +| | output: count:merge(*) +| | +| 13:EXCHANGE [UNPARTITIONED] +| | +| 06:AGGREGATE +| | output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| | +| 05:SCAN HDFS [functional_parquet.alltypes] +| partitions=24/24 files=24 size=178.13KB +| +|--12:AGGREGATE [FINALIZE] +| | output: count:merge(*) +| | +| 11:EXCHANGE [UNPARTITIONED] +| | +| 04:AGGREGATE +| | output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| | +| 03:SCAN HDFS [functional_parquet.alltypes] +| partitions=24/24 files=24 size=178.13KB +| +10:AGGREGATE [FINALIZE] +| output: count:merge(*) +| +09:EXCHANGE [UNPARTITIONED] +| +02:AGGREGATE +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| +01:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Verify that the parquet count(*) optimization is applied even if there is more than +# one item in the select list. +select count(*), count(1), count(123) from functional_parquet.alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Select count() - the optimization should be disabled because it's not a +# count() or count(*) aggregate function. +select count(year) from functional_parquet.alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(year) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Group by partition columns. +select month, count(*) from functional_parquet.alltypes group by month, year +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| group by: month, year +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# The optimization is disabled because tinyint_col is not a partition col. +select tinyint_col, count(*) from functional_parquet.alltypes group by tinyint_col, year +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| group by: tinyint_col, year +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# The optimization is disabled because there are two aggregate functions. +select avg(year), count(*) from functional_parquet.alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: avg(year), count(*) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Optimization is not applied because the inner count(*) is not materialized. The outer +# count(*) does not reference a base table. +select count(*) from (select count(*) from functional_parquet.alltypes) t +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: count(*) +| +01:AGGREGATE [FINALIZE] +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# The optimization is applied if count(*) is in the having clause. +select 1 from functional_parquet.alltypes having count(*) > 1 +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| having: count(*) > 1 +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# The count(*) optimization is applied in the inline view. +select count(*), count(a) from (select count(1) as a from functional_parquet.alltypes) t +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: count(*), count(count(*)) +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# The count(*) optimization is applied to the inline view even if there is a join. +select * +from functional.alltypes x inner join ( + select count(1) as a from functional_parquet.alltypes group by year +) t on x.id = t.a; +---- PLAN +PLAN-ROOT SINK +| +03:HASH JOIN [INNER JOIN] +| hash predicates: x.id = count(*) +| runtime filters: RF000 <- count(*) +| +|--02:AGGREGATE [FINALIZE] +| | output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| | group by: year +| | +| 01:SCAN HDFS [functional_parquet.alltypes] +| partitions=24/24 files=24 size=178.13KB +| +00:SCAN HDFS [functional.alltypes x] + partitions=24/24 files=24 size=478.45KB + runtime filters: RF000 -> x.id +==== +# The count(*) optimization is not applied if there is more than 1 table ref. +select count(*) from functional_parquet.alltypes a, functional_parquet.alltypes b +---- PLAN +PLAN-ROOT SINK +| +03:AGGREGATE [FINALIZE] +| output: count(*) +| +02:NESTED LOOP JOIN [CROSS JOIN] +| +|--01:SCAN HDFS [functional_parquet.alltypes b] +| partitions=24/24 files=24 size=178.13KB +| +00:SCAN HDFS [functional_parquet.alltypes a] + partitions=24/24 files=24 size=178.13KB +==== +# The count(*) optimization is applied if there are predicates on partition columns. +select count(1) from functional_parquet.alltypes where year < 2010 and month > 8; +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=4/24 files=4 size=29.75KB +==== +# tinyint_col is not a partition column so the optimization is disabled. +select count(1) from functional_parquet.alltypes where year < 2010 and tinyint_col > 8; +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=12/24 files=12 size=89.05KB + predicates: tinyint_col > 8 +==== +# Optimization is applied after constant folding. +select count(1 + 2 + 3) from functional_parquet.alltypes +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Optimization is not applied to count(null). +select count(1 + null + 3) from functional_parquet.alltypes +union all +select count(null) from functional_parquet.alltypes +---- PLAN +PLAN-ROOT SINK +| +00:UNION +| pass-through-operands: all +| +|--04:AGGREGATE [FINALIZE] +| | output: count(NULL) +| | +| 03:SCAN HDFS [functional_parquet.alltypes] +| partitions=24/24 files=24 size=178.13KB +| +02:AGGREGATE [FINALIZE] +| output: count(NULL + 3) +| +01:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Optimization is not applied when selecting from an empty table. +select count(*) from functional_parquet.emptytable +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| +00:SCAN HDFS [functional_parquet.emptytable] + partitions=0/0 files=0 size=0B +==== +# Optimization is not applied when all partitions are pruned. +select count(1) from functional_parquet.alltypes where year = -1 +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=0/24 files=0 size=0B +==== +# Optimization is not applied across query blocks, even though it would be correct here. +select count(*) from (select int_col from functional_parquet.alltypes) t +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: count(*) +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# Optimization is not applied when there is a distinct agg. +select count(*), count(distinct 1) from functional_parquet.alltypes +---- PLAN +PLAN-ROOT SINK +| +02:AGGREGATE [FINALIZE] +| output: count(1), count:merge(*) +| +01:AGGREGATE +| output: count(*) +| group by: 1 +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=24/24 files=24 size=178.13KB +==== +# The optimization is applied here because only the count(*) and a partition column are +# materialized. Non-materialized agg exprs are ignored. +select year, cnt from ( + select year, count(bigint_col), count(*) cnt, avg(int_col) + from functional_parquet.alltypes + where month=1 + group by year +) t +---- PLAN +PLAN-ROOT SINK +| +01:AGGREGATE [FINALIZE] +| output: sum_init_zero(functional_parquet.alltypes.parquet-stats: num_rows) +| group by: year +| +00:SCAN HDFS [functional_parquet.alltypes] + partitions=2/24 files=2 size=15.01KB +==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index 0d527de5a..bb97c2699 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -320,7 +320,7 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6 select count(*) from tpch_parquet.lineitem ---- DISTRIBUTEDPLAN Per-Host Resource Reservation: Memory=0B -Per-Host Resource Estimates: Memory=20.00MB +Per-Host Resource Estimates: Memory=90.00MB F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 PLAN-ROOT SINK @@ -337,7 +337,7 @@ PLAN-ROOT SINK | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 01:AGGREGATE -| output: count(*) +| output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows) | mem-estimate=10.00MB mem-reservation=0B | tuple-ids=1 row-size=8B cardinality=1 | @@ -346,8 +346,8 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3 stats-rows=6001215 extrapolated-rows=disabled table stats: rows=6001215 size=193.92MB column stats: all - mem-estimate=0B mem-reservation=0B - tuple-ids=0 row-size=0B cardinality=6001215 + mem-estimate=80.00MB mem-reservation=0B + tuple-ids=0 row-size=8B cardinality=6001215 ---- PARALLELPLANS Per-Host Resource Reservation: Memory=0B Per-Host Resource Estimates: Memory=180.00MB @@ -367,7 +367,7 @@ PLAN-ROOT SINK | F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6 01:AGGREGATE -| output: count(*) +| output: sum_init_zero(tpch_parquet.lineitem.parquet-stats: num_rows) | mem-estimate=10.00MB mem-reservation=0B | tuple-ids=1 row-size=8B cardinality=1 | @@ -377,7 +377,7 @@ F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6 table stats: rows=6001215 size=193.92MB column stats: all mem-estimate=80.00MB mem-reservation=0B - tuple-ids=0 row-size=0B cardinality=6001215 + tuple-ids=0 row-size=8B cardinality=6001215 ==== # Sort select * diff --git a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test index 524c63bb5..de1c50707 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/aggregation.test +++ b/testdata/workloads/functional-query/queries/QueryTest/aggregation.test @@ -1255,3 +1255,67 @@ where t2.int_col IN (t1.int_col_1, t1.int_col) ---- TYPES TIMESTAMP,BIGINT ==== +---- QUERY +# IMPALA-5036: Tests the correctness of the Parquet count(*) optimization. +select count(1) +from functional_parquet.alltypes +---- RESULTS +7300 +---- TYPES +bigint +===== +---- QUERY +# IMPALA-5036: Parquet count(*) optimization with predicates on the partition columns. +select count(1) +from functional_parquet.alltypes where year < 2010 and month > 8 +---- RESULTS +1220 +---- TYPES +bigint +===== +---- QUERY +# IMPALA-5036: Parquet count(*) optimization with group by partition columns. +select year, month, count(1) +from functional_parquet.alltypes where month > 10 group by year, month +---- RESULTS +2009,11,300 +2009,12,310 +2010,11,300 +2010,12,310 +---- TYPES +int, int, bigint +===== +---- QUERY +# IMPALA-5036: Parquet count(*) optimization with both group by and predicates on +# partition columns. +select count(1) +from functional_parquet.alltypes where year < 2010 and month > 8 +group by month +---- RESULTS +310 +300 +310 +300 +---- TYPES +bigint +===== +---- QUERY +# IMPALA-5036: Parquet count(*) optimization with the result of the going into a join. +select x.bigint_col from functional.alltypes x + inner join ( + select count(1) as a from functional_parquet.alltypes group by year + ) t on x.id = t.a; +---- RESULTS +0 +0 +---- TYPES +bigint +===== +---- QUERY +# IMPALA-5036: Parquet count(*) optimization with the agg function in the having clause. +select 1 from functional_parquet.alltypes having count(*) > 1 +---- RESULTS +1 +---- TYPES +tinyint +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test new file mode 100644 index 000000000..3b1c33bd6 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test @@ -0,0 +1,117 @@ +==== +---- QUERY +# Tests the correctness of the Parquet count(*) optimization. +select count(1) +from functional_parquet.alltypes +---- RESULTS +7300 +---- TYPES +bigint +===== +---- QUERY +# Parquet count(*) optimization with predicates on the partition columns. +select count(1) +from functional_parquet.alltypes where year < 2010 and month > 8 +---- RESULTS +1220 +---- TYPES +bigint +===== +---- QUERY +# Parquet count(*) optimization with group by partition columns. +select year, month, count(1) +from functional_parquet.alltypes group by year, month +---- RESULTS +2009,1,310 +2009,2,280 +2009,3,310 +2009,4,300 +2009,5,310 +2009,6,300 +2009,7,310 +2009,8,310 +2009,9,300 +2009,10,310 +2009,11,300 +2009,12,310 +2010,1,310 +2010,2,280 +2010,3,310 +2010,4,300 +2010,5,310 +2010,6,300 +2010,7,310 +2010,8,310 +2010,9,300 +2010,10,310 +2010,11,300 +2010,12,310 +---- TYPES +int, int, bigint +===== +---- QUERY +# Parquet count(*) optimization with both group by and predicates on partition columns. +select count(1) +from functional_parquet.alltypes where year < 2010 and month > 8 +group by month +---- RESULTS +310 +300 +310 +300 +---- TYPES +bigint +===== +---- QUERY +# Parquet count(*) optimization with the result going into a join. +select x.bigint_col from functional.alltypes x + inner join ( + select count(1) as a from functional_parquet.alltypes group by year + ) t on x.id = t.a; +---- RESULTS +0 +0 +---- TYPES +bigint +===== +---- QUERY +# Parquet count(*) optimization with the agg function in the having clause. +select 1 from functional_parquet.alltypes having count(*) > 1 +---- RESULTS +1 +---- TYPES +tinyint +==== +---- QUERY +# Verify that 0 is returned for count(*) on an empty table. +select count(1) from functional_parquet.emptytable +---- RESULTS +0 +---- TYPES +bigint +===== +---- QUERY +# Verify that 0 is returned when all partitions are pruned. +select count(1) from functional_parquet.alltypes where year = -1 +---- RESULTS +0 +---- TYPES +bigint +===== +---- QUERY +# Test different row group size combinations. +select count(*) from functional_parquet.lineitem_multiblock +union all +select count(*) from functional_parquet.lineitem_multiblock_one_row_group +union all +select count(*) from functional_parquet.lineitem_sixblocks +union all +select count(*) from tpch_parquet.lineitem +---- RESULTS +20000 +40000 +40000 +6001215 +---- TYPES +bigint +===== diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test similarity index 100% rename from testdata/workloads/functional-query/queries/QueryTest/parquet_stats.test rename to testdata/workloads/functional-query/queries/QueryTest/parquet-stats.test diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py index a257bc08e..289a8670b 100644 --- a/tests/query_test/test_aggregation.py +++ b/tests/query_test/test_aggregation.py @@ -271,6 +271,15 @@ class TestAggregationQueries(ImpalaTestSuite): # Verify codegen was enabled for all four stages of the aggregation. assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6]) + def test_parquet_count_star_optimization(self, vector): + if (vector.get_value('table_format').file_format != 'text' or + vector.get_value('table_format').compression_codec != 'none'): + # No need to run this test on all file formats + pytest.skip() + self.run_test_case('QueryTest/parquet-stats-agg', vector) + vector.get_value('exec_option')['batch_size'] = 1 + self.run_test_case('QueryTest/parquet-stats-agg', vector) + class TestWideAggregationQueries(ImpalaTestSuite): """Test that aggregations with many grouping columns work""" @classmethod diff --git a/tests/query_test/test_parquet_stats.py b/tests/query_test/test_parquet_stats.py index 93fc06d95..00afd09d9 100644 --- a/tests/query_test/test_parquet_stats.py +++ b/tests/query_test/test_parquet_stats.py @@ -46,7 +46,7 @@ class TestParquetStats(ImpalaTestSuite): # The test makes assumptions about the number of row groups that are processed and # skipped inside a fragment, so we ensure that the tests run in a single fragment. vector.get_value('exec_option')['num_nodes'] = 1 - self.run_test_case('QueryTest/parquet_stats', vector, use_db=unique_database) + self.run_test_case('QueryTest/parquet-stats', vector, use_db=unique_database) def test_deprecated_stats(self, vector, unique_database): """Test that reading parquet files with statistics with deprecated 'min'/'max' fields @@ -69,4 +69,3 @@ class TestParquetStats(ImpalaTestSuite): # skipped inside a fragment, so we ensure that the tests run in a single fragment. vector.get_value('exec_option')['num_nodes'] = 1 self.run_test_case('QueryTest/parquet-deprecated-stats', vector, unique_database) -