Revert "IMPALA-11123: Optimize count(star) for ORC scans"

This reverts commit f932d78ad0.

The commit is reverted because it cause significant regression for
non-optimized counts star query in parquet format.

There are several conflicts that need to be resolved manually:
- Removed assertion against 'NumFileMetadataRead' counter that is lost
  with the revert.
- Adjust the assertion in test_plain_count_star_optimization,
  test_in_predicate_push_down, and test_partitioned_insert of
  test_iceberg.py due to missing improvement in parquet optimized count
  star code path.
- Keep the "override" specifier in hdfs-parquet-scanner.h to pass
  clang-tidy
- Keep python3 style of RuntimeError instantiation in
  test_file_parser.py to pass check-python-syntax.sh

Change-Id: Iefd8fd0838638f9db146f7b706e541fe2aaf01c1
Reviewed-on: http://gerrit.cloudera.org:8080/19843
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Wenzhe Zhou <wzhou@cloudera.com>
This commit is contained in:
Riza Suminto
2023-05-04 14:45:43 -07:00
committed by Wenzhe Zhou
parent 4d9f50eb74
commit 7ca20b3c94
39 changed files with 269 additions and 1104 deletions

View File

@@ -64,27 +64,20 @@ PROFILE_DEFINE_COUNTER(IoReadTotalBytes, DEBUG, TUnit::BYTES,
"The total number of bytes read from streams.");
PROFILE_DEFINE_COUNTER(IoReadSkippedBytes, DEBUG, TUnit::BYTES,
"The total number of bytes skipped from streams.");
PROFILE_DEFINE_COUNTER(NumFileMetadataRead, DEBUG, TUnit::UNIT,
"The total number of file metadata reads done in place of rows or row groups / "
"stripe iteration.");
const char* HdfsColumnarScanner::LLVM_CLASS_NAME = "class.impala::HdfsColumnarScanner";
HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: HdfsScanner(scan_node, state),
HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node,
RuntimeState* state) :
HdfsScanner(scan_node, state),
scratch_batch_(new ScratchTupleBatch(
*scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())),
assemble_rows_timer_(scan_node->materialize_tuple_timer()) {}
*scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())) {
}
HdfsColumnarScanner::~HdfsColumnarScanner() {}
Status HdfsColumnarScanner::Open(ScannerContext* context) {
RETURN_IF_ERROR(HdfsScanner::Open(context));
// Memorize 'is_footer_scanner_' here since 'stream_' can be released early.
const io::ScanRange* range = stream_->scan_range();
is_footer_scanner_ =
range->offset() + range->bytes_to_read() >= stream_->file_desc()->file_length;
RuntimeProfile* profile = scan_node_->runtime_profile();
num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile);
num_scanners_with_no_reads_counter_ =
@@ -102,7 +95,6 @@ Status HdfsColumnarScanner::Open(ScannerContext* context) {
io_total_request_ = PROFILE_IoReadTotalRequest.Instantiate(profile);
io_total_bytes_ = PROFILE_IoReadTotalBytes.Instantiate(profile);
io_skipped_bytes_ = PROFILE_IoReadSkippedBytes.Instantiate(profile);
num_file_metadata_read_ = PROFILE_NumFileMetadataRead.Instantiate(profile);
return Status::OK();
}
@@ -298,60 +290,6 @@ Status HdfsColumnarScanner::DivideReservationBetweenColumns(
return Status::OK();
}
Status HdfsColumnarScanner::GetNextWithCountStarOptimization(RowBatch* row_batch) {
// There are no materialized slots, e.g. count(*) over the table. We can serve
// this query from just the file metadata. We don't need to read the column data.
// Only scanner of the footer split will run in this case. See the logic in
// HdfsScanner::IssueFooterRanges() and HdfsScanNodeBase::ReadsFileMetadataOnly().
DCHECK(is_footer_scanner_);
int64_t tuple_buffer_size;
uint8_t* tuple_buffer;
int capacity = 1;
RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state_,
row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), &capacity,
&tuple_buffer_size, &tuple_buffer));
int64_t num_rows = GetNumberOfRowsInFile();
COUNTER_ADD(num_file_metadata_read_, 1);
DCHECK_LE(rows_read_in_group_, num_rows);
Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buffer);
InitTuple(template_tuple_, dst_tuple);
int64_t* dst_slot = dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset());
*dst_slot = num_rows;
TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
dst_row->SetTuple(0, dst_tuple);
row_batch->CommitLastRow();
rows_read_in_group_ += num_rows;
eos_ = true;
return Status::OK();
}
Status HdfsColumnarScanner::GetNextWithTemplateTuple(RowBatch* row_batch) {
// There are no materialized slots, e.g. "select 1" over the table. We can serve
// this query from just the file metadata. We don't need to read the column data.
// Only scanner of the footer split will run in this case. See the logic in
// HdfsScanner::IssueFooterRanges() and HdfsScanNodeBase::ReadsFileMetadataOnly().
// We might also get here for count(*) query against full acid table such as:
// "select count(*) from functional_orc_def.alltypes;"
DCHECK(is_footer_scanner_);
int64_t file_rows = GetNumberOfRowsInFile();
COUNTER_ADD(num_file_metadata_read_, 1);
if (rows_read_in_group_ == file_rows) {
eos_ = true;
return Status::OK();
}
assemble_rows_timer_.Start();
DCHECK_LT(rows_read_in_group_, file_rows);
int64_t rows_remaining = file_rows - rows_read_in_group_;
int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
Status status = CommitRows(num_to_commit, row_batch);
assemble_rows_timer_.Stop();
RETURN_IF_ERROR(status);
rows_read_in_group_ += max_tuples;
return Status::OK();
}
void HdfsColumnarScanner::AddSyncReadBytesCounter(int64_t total_bytes) {
io_sync_request_->Add(1);
io_total_request_->Add(1);

View File

@@ -60,25 +60,6 @@ class HdfsColumnarScanner : public HdfsScanner {
/// top-level tuples. See AssembleRows() in the derived classes.
boost::scoped_ptr<ScratchTupleBatch> scratch_batch_;
/// Timer for materializing rows. This ignores time getting the next buffer.
ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
/// Index of the current row group / stripe being processed. Initialized to -1 which
/// indicates that we have not started processing the first group yet (GetNext() has
/// not yet been called).
int32_t group_idx_ = -1;
/// Counts the number of rows processed for the current row group / stripe.
int64_t rows_read_in_group_ = 0;
/// Indicates whether we should advance to the next row group / stripe in the next
/// GetNext(). Starts out as true to move to the very first row group.
bool advance_group_ = true;
/// Indicate whether this is a footer scanner or not.
/// Assigned in HdfsColumnarScanner::Open().
bool is_footer_scanner_ = false;
/// Scan range for the metadata.
const io::ScanRange* metadata_range_ = nullptr;
@@ -119,9 +100,6 @@ class HdfsColumnarScanner : public HdfsScanner {
Status DivideReservationBetweenColumns(const ColumnRangeLengths& col_range_lengths,
ColumnReservations& reservation_per_column);
/// Get the number of rows in file.
virtual int64_t GetNumberOfRowsInFile() const = 0;
/// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for
/// dividing a reservation of 'reservation_to_distribute' bytes between columns with
/// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
@@ -136,16 +114,6 @@ class HdfsColumnarScanner : public HdfsScanner {
/// in ExecEnv.
static int64_t ComputeIdealReservation(const ColumnRangeLengths& col_range_lengths);
/// Handle count(*) queries by reading the row count from the footer statistics.
/// The optimization is possible only in simpler cases e.g. when there are no conjucts.
/// Check ScanNode.java#canApplyCountStarOptimization for full detail.
Status GetNextWithCountStarOptimization(RowBatch* row_batch);
/// Handle zero slot scan queries by reading the row count from the footer statistics.
/// Possible queries include "select 1" or "select count(*)" over full acid table that
/// does not require row validation.
Status GetNextWithTemplateTuple(RowBatch* row_batch);
/// Number of columns that need to be read.
RuntimeProfile::Counter* num_cols_counter_;
@@ -174,11 +142,6 @@ class HdfsColumnarScanner : public HdfsScanner {
/// Total number of bytes skipped during stream reading.
RuntimeProfile::Counter* io_skipped_bytes_;
/// Total file metadata reads done.
/// Incremented when serving query from metadata instead of iterating rows or
/// row groups / stripes.
RuntimeProfile::Counter* num_file_metadata_read_;
private:
int ProcessScratchBatchCodegenOrInterpret(RowBatch* dst_batch);
};

View File

@@ -450,8 +450,9 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno
hdfs_scan_node.skip_header_line_count :
0),
tuple_id_(pnode.tuple_id_),
count_star_slot_offset_(hdfs_scan_node.__isset.count_star_slot_offset ?
hdfs_scan_node.count_star_slot_offset :
parquet_count_star_slot_offset_(
hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
hdfs_scan_node.parquet_count_star_slot_offset :
-1),
is_partition_key_scan_(hdfs_scan_node.is_partition_key_scan),
tuple_desc_(pnode.tuple_desc_),

View File

@@ -30,7 +30,6 @@
#include <boost/unordered_map.hpp>
#include "codegen/codegen-fn-ptr.h"
#include "exec/acid-metadata-utils.h"
#include "exec/file-metadata-utils.h"
#include "exec/filter-context.h"
#include "exec/scan-node.h"
@@ -464,12 +463,10 @@ class HdfsScanNodeBase : public ScanNode {
const AvroSchemaElement& avro_schema() const { return avro_schema_; }
int skip_header_line_count() const { return skip_header_line_count_; }
io::RequestContext* reader_context() const { return reader_context_.get(); }
bool optimize_count_star() const {
bool is_optimized = count_star_slot_offset_ != -1;
DCHECK(!hdfs_table_->IsTableFullAcid() || !is_optimized);
return is_optimized;
bool optimize_parquet_count_star() const {
return parquet_count_star_slot_offset_ != -1;
}
int count_star_slot_offset() const { return count_star_slot_offset_; }
int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; }
bool is_partition_key_scan() const { return is_partition_key_scan_; }
typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>>
@@ -601,32 +598,6 @@ class HdfsScanNodeBase : public ScanNode {
virtual_column_slots().empty();
}
/// Return true if scan over 'filename' require row validation.
/// Hive Streaming Ingestion allocates multiple write ids, hence create delta
/// directories like delta_5_10. Then it continuously appends new stripes (and footers)
/// to the ORC files of the delte dir. So it's possible that the file has rows that
/// Impala is not allowed to see based on its valid write id list. In such cases we need
/// to validate the write ids of the row batches.
inline bool RequireRowValidation(std::string filename) const {
if (!hdfs_table()->IsTableFullAcid()) return false;
if (ValidWriteIdList::IsCompacted(filename)) return false;
ValidWriteIdList valid_write_ids;
std::pair<int64_t, int64_t> acid_write_id_range =
valid_write_ids.GetWriteIdRange(filename);
valid_write_ids.InitFrom(hdfs_table()->ValidWriteIdList());
ValidWriteIdList::RangeResponse rows_valid = valid_write_ids.IsWriteIdRangeValid(
acid_write_id_range.first, acid_write_id_range.second);
DCHECK_NE(rows_valid, ValidWriteIdList::NONE);
return rows_valid == ValidWriteIdList::SOME;
}
/// Return true if scan over 'filename 'can be served only by reading the file metadata,
/// such as a count(*) over the table.
inline bool ReadsFileMetadataOnly(std::string filename) const {
return !RequireRowValidation(filename)
&& (IsZeroSlotTableScan() || optimize_count_star());
}
/// Transfers all memory from 'pool' to shared state of all scanners.
void TransferToSharedStatePool(MemPool* pool);
@@ -715,11 +686,11 @@ class HdfsScanNodeBase : public ScanNode {
/// Tuple id of the tuple descriptor to be used.
const int tuple_id_;
/// The byte offset of the slot for Parquet/ORC metadata if count star optimization
/// The byte offset of the slot for Parquet metadata if Parquet count star optimization
/// is enabled. When set, this scan node can optimize a count(*) query by populating
/// the tuple with data from the Parquet num rows statistic. See
/// applyCountStarOptimization() in ScanNode.java.
const int count_star_slot_offset_;
/// applyParquetCountStartOptimization() in HdfsScanNode.java.
const int parquet_count_star_slot_offset_;
// True if this is a partition key scan that needs only to return at least one row from
// each scan range. If true, the scan node and scanner implementations should attempt

View File

@@ -873,13 +873,12 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node,
ScanRange* split = files[i]->splits[j];
DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
// If scan only reads file metadata (such as count(*) over the table), we can
// If there are no materialized slots (such as count(*) over the table), we can
// get the result with the file metadata alone and don't need to read any row
// groups. We only want a single node to process the file footer in this case,
// which is the node with the footer split. If it's not a count(*), we create a
// footer range for the split always.
if (!scan_node->ReadsFileMetadataOnly(files[i]->filename)
|| footer_split == split) {
if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
ScanRangeMetadata* split_metadata =
static_cast<ScanRangeMetadata*>(split->meta_data());
// Each split is processed by first issuing a scan range for the file footer, which

View File

@@ -311,7 +311,8 @@ HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: HdfsColumnarScanner(scan_node, state),
dictionary_pool_(new MemPool(scan_node->mem_tracker())),
data_batch_pool_(new MemPool(scan_node->mem_tracker())),
search_args_pool_(new MemPool(scan_node->mem_tracker())) {
search_args_pool_(new MemPool(scan_node->mem_tracker())),
assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
assemble_rows_timer_.Stop();
}
@@ -405,9 +406,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME;
}
if (scan_node_->optimize_count_star() && !row_batches_need_validation_) {
template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
return Status::OK();
if (UNLIKELY(scan_node_->optimize_parquet_count_star())) {
DCHECK(false);
return Status("Internal ERROR: ORC scanner cannot optimize count star slot.");
}
// Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
@@ -777,20 +778,29 @@ Status HdfsOrcScanner::ProcessSplit() {
}
Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
if (row_batches_need_validation_) {
// In case 'row_batches_need_validation_' is true, we need to look at the row
// batches and check their validity. This might be a zero slot scan, which
// 'currentTransaction' is the only selected field from the file. And this should
// not be an optimized count(*) because it is disabled for full acid table.
DCHECK(!scan_node_->optimize_count_star());
} else if (scan_node_->optimize_count_star()) {
// This is an optimized count(*) case.
// For each file, populate one slot with the footer's numberOfRows statistic.
return GetNextWithCountStarOptimization(row_batch);
} else if (scan_node_->IsZeroSlotTableScan()) {
// There are no materialized slots, e.g. "select 1" over the table. We can serve
// In case 'row_batches_need_validation_' is true, we need to look at the row
// batches and check their validity. In that case 'currentTransaction' is the only
// selected field from the file (in case of zero slot scans).
if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) {
uint64_t file_rows = reader_->getNumberOfRows();
// There are no materialized slots, e.g. count(*) over the table. We can serve
// this query from just the file metadata. We don't need to read the column data.
return GetNextWithTemplateTuple(row_batch);
if (stripe_rows_read_ == file_rows) {
eos_ = true;
return Status::OK();
}
assemble_rows_timer_.Start();
DCHECK_LT(stripe_rows_read_, file_rows);
int64_t rows_remaining = file_rows - stripe_rows_read_;
int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
Status status = CommitRows(num_to_commit, row_batch);
assemble_rows_timer_.Stop();
RETURN_IF_ERROR(status);
stripe_rows_read_ += max_tuples;
COUNTER_ADD(scan_node_->rows_read_counter(), num_to_commit);
return Status::OK();
}
if (!scratch_batch_->AtEnd()) {
@@ -822,7 +832,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
// 'advance_stripe_' is updated in 'NextStripe', meaning the current stripe we advance
// to can be skip. 'end_of_stripe_' marks whether current stripe is drained. It's only
// set to true in 'AssembleRows'.
while (advance_group_ || end_of_stripe_) {
while (advance_stripe_ || end_of_stripe_) {
// The next stripe will use a new dictionary blob so transfer the memory to row_batch.
row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
context_->ReleaseCompletedResources(/* done */ true);
@@ -830,8 +840,8 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
RETURN_IF_ERROR(CommitRows(0, row_batch));
RETURN_IF_ERROR(NextStripe());
DCHECK_LE(group_idx_, reader_->getNumberOfStripes());
if (group_idx_ == reader_->getNumberOfStripes()) {
DCHECK_LE(stripe_idx_, reader_->getNumberOfStripes());
if (stripe_idx_ == reader_->getNumberOfStripes()) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
@@ -873,19 +883,19 @@ Status HdfsOrcScanner::NextStripe() {
int64_t split_offset = split_range->offset();
int64_t split_length = split_range->len();
bool start_with_first_stripe = group_idx_ == -1;
bool start_with_first_stripe = stripe_idx_ == -1;
bool misaligned_stripe_skipped = false;
advance_group_ = false;
rows_read_in_group_ = 0;
advance_stripe_ = false;
stripe_rows_read_ = 0;
// Loop until we have found a non-empty stripe.
while (true) {
// Reset the parse status for the next stripe.
parse_status_ = Status::OK();
++group_idx_;
if (group_idx_ >= reader_->getNumberOfStripes()) {
++stripe_idx_;
if (stripe_idx_ >= reader_->getNumberOfStripes()) {
if (start_with_first_stripe && misaligned_stripe_skipped) {
// We started with the first stripe and skipped all the stripes because they were
// misaligned. The execution flow won't reach this point if there is at least one
@@ -894,7 +904,7 @@ Status HdfsOrcScanner::NextStripe() {
}
break;
}
unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(group_idx_);
unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(stripe_idx_);
// Also check 'footer_.numberOfRows' to make sure 'select count(*)' and 'select *'
// behave consistently for corrupt files that have 'footer_.numberOfRows == 0'
// but some data in stripe.
@@ -967,7 +977,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
if (row_batch->AtCapacity()) break;
continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled();
}
rows_read_in_group_ += num_rows_read;
stripe_rows_read_ += num_rows_read;
COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
// Merge Scanner-local counter into HdfsScanNode counter and reset.
COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);

View File

@@ -185,11 +185,6 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
return THdfsFileFormat::ORC;
}
protected:
virtual int64_t GetNumberOfRowsInFile() const override {
return static_cast<int64_t>(reader_->getNumberOfRows());
}
private:
friend class OrcColumnReader;
friend class OrcDateColumnReader;
@@ -200,10 +195,23 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
friend class OrcStructReader;
friend class OrcListReader;
friend class OrcMapReader;
friend class HdfsOrcScannerTest;
/// Memory guard of the tuple_mem_
uint8_t* tuple_mem_end_ = nullptr;
/// Index of the current stripe being processed. Stripe in ORC is equivalent to
/// RowGroup in Parquet. Initialized to -1 which indicates that we have not started
/// processing the first stripe yet (GetNext() has not yet been called).
int32_t stripe_idx_ = -1;
/// Counts the number of rows processed for the current stripe.
int64_t stripe_rows_read_ = 0;
/// Indicates whether we should advance to the next stripe in the next GetNext().
/// Starts out as true to move to the very first stripe.
bool advance_stripe_ = true;
/// Indicates whether we are at the end of a stripe.
bool end_of_stripe_ = true;
@@ -288,6 +296,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
/// offset, and there are no two overlapping range.
vector<ColumnRange> columnRanges_;
/// Timer for materializing rows. This ignores time getting the next buffer.
ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
/// Number of stripes that need to be read.
RuntimeProfile::Counter* num_stripes_counter_ = nullptr;

View File

@@ -337,8 +337,8 @@ class OrcStringColumnReader : public OrcPrimitiveColumnReader<OrcStringColumnRea
}
DCHECK(static_cast<orc::EncodedStringVectorBatch*>(batch_) ==
dynamic_cast<orc::EncodedStringVectorBatch*>(orc_batch));
if (last_stripe_idx_ != scanner_->group_idx_) {
last_stripe_idx_ = scanner_->group_idx_;
if (last_stripe_idx_ != scanner_->stripe_idx_) {
last_stripe_idx_ = scanner_->stripe_idx_;
auto current_batch = static_cast<orc::EncodedStringVectorBatch*>(batch_);
return InitBlob(&current_batch->dictionary->dictionaryBlob,
scanner_->dictionary_pool_.get());

View File

@@ -86,10 +86,14 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state)
: HdfsColumnarScanner(scan_node, state),
row_group_idx_(-1),
row_group_rows_read_(0),
advance_row_group_(true),
min_max_tuple_(nullptr),
row_batches_produced_(0),
dictionary_pool_(new MemPool(scan_node->mem_tracker())),
stats_batch_read_pool_(new MemPool(scan_node->mem_tracker())),
assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
num_stats_filtered_row_groups_counter_(nullptr),
num_minmax_filtered_row_groups_counter_(nullptr),
num_bloom_filtered_row_groups_counter_(nullptr),
@@ -379,7 +383,7 @@ static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group,
int HdfsParquetScanner::CountScalarColumns(
const vector<ParquetColumnReader*>& column_readers) {
DCHECK(!column_readers.empty() || scan_node_->optimize_count_star());
DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
int num_columns = 0;
stack<ParquetColumnReader*> readers;
for (ParquetColumnReader* r: column_readers_) readers.push(r);
@@ -429,15 +433,55 @@ Status HdfsParquetScanner::ProcessSplit() {
Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
if (scan_node_->optimize_count_star()) {
// This is an optimized count(*) case.
if (scan_node_->optimize_parquet_count_star()) {
// Populate the single slot with the Parquet num rows statistic.
return GetNextWithCountStarOptimization(row_batch);
int64_t tuple_buf_size;
uint8_t* tuple_buf;
// We try to allocate a smaller row batch here because in most cases the number row
// groups in a file is much lower than the default row batch capacity.
int capacity = min(
static_cast<int>(file_metadata_.row_groups.size()), row_batch->capacity());
RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_,
row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(),
&capacity, &tuple_buf_size, &tuple_buf));
while (!row_batch->AtCapacity()) {
RETURN_IF_ERROR(NextRowGroup());
DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
if (row_group_idx_ == file_metadata_.row_groups.size()) break;
Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
InitTuple(template_tuple_, dst_tuple);
int64_t* dst_slot =
dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
*dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
row_group_rows_read_ += *dst_slot;
dst_row->SetTuple(0, dst_tuple);
row_batch->CommitLastRow();
tuple_buf += scan_node_->tuple_desc()->byte_size();
}
eos_ = row_group_idx_ == file_metadata_.row_groups.size();
return Status::OK();
} else if (scan_node_->IsZeroSlotTableScan()) {
// There are no materialized slots and we are not optimizing count(*), e.g.
// "select 1 from alltypes". We can serve this query from just the file metadata.
// We don't need to read the column data.
return GetNextWithTemplateTuple(row_batch);
if (row_group_rows_read_ == file_metadata_.num_rows) {
eos_ = true;
return Status::OK();
}
assemble_rows_timer_.Start();
DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
Status status = CommitRows(row_batch, num_to_commit);
assemble_rows_timer_.Stop();
RETURN_IF_ERROR(status);
row_group_rows_read_ += max_tuples;
COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
return Status::OK();
}
// Transfer remaining tuples from the scratch batch.
@@ -449,18 +493,18 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
if (row_batch->AtCapacity()) return Status::OK();
}
while (advance_group_ || column_readers_[0]->RowGroupAtEnd()) {
while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
// Transfer resources and clear streams if there is any leftover from the previous
// row group. We will create new streams for the next row group.
FlushRowGroupResources(row_batch);
if (!advance_group_) {
if (!advance_row_group_) {
Status status =
ValidateEndOfRowGroup(column_readers_, group_idx_, rows_read_in_group_);
ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_);
if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
}
RETURN_IF_ERROR(NextRowGroup());
DCHECK_LE(group_idx_, file_metadata_.row_groups.size());
if (group_idx_ == file_metadata_.row_groups.size()) {
DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
if (row_group_idx_ == file_metadata_.row_groups.size()) {
eos_ = true;
DCHECK(parse_status_.ok());
return Status::OK();
@@ -481,9 +525,9 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
assemble_rows_timer_.Start();
Status status;
if (filter_pages_) {
status = AssembleRows<true>(row_batch, &advance_group_);
status = AssembleRows<true>(row_batch, &advance_row_group_);
} else {
status = AssembleRows<false>(row_batch, &advance_group_);
status = AssembleRows<false>(row_batch, &advance_row_group_);
}
assemble_rows_timer_.Stop();
RETURN_IF_ERROR(status);
@@ -819,11 +863,11 @@ Status HdfsParquetScanner::NextRowGroup() {
const HdfsFileDesc* file_desc =
scan_node_->GetFileDesc(context_->partition_descriptor()->id(), filename());
bool start_with_first_row_group = group_idx_ == -1;
bool start_with_first_row_group = row_group_idx_ == -1;
bool misaligned_row_group_skipped = false;
advance_group_ = false;
rows_read_in_group_ = 0;
advance_row_group_ = false;
row_group_rows_read_ = 0;
// Loop until we have found a non-empty row group, and successfully initialized and
// seeded the column readers. Return a non-OK status from within loop only if the error
@@ -835,8 +879,8 @@ Status HdfsParquetScanner::NextRowGroup() {
// or previous row groups.
DCHECK_EQ(0, context_->NumStreams());
++group_idx_;
if (group_idx_ >= file_metadata_.row_groups.size()) {
++row_group_idx_;
if (row_group_idx_ >= file_metadata_.row_groups.size()) {
if (start_with_first_row_group && misaligned_row_group_skipped) {
// We started with the first row group and skipped all the row groups because
// they were misaligned. The execution flow won't reach this point if there is at
@@ -845,7 +889,7 @@ Status HdfsParquetScanner::NextRowGroup() {
}
break;
}
const parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
// Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *'
// behave consistently for corrupt files that have 'file_metadata_.num_rows == 0'
// but some data in row groups.
@@ -854,7 +898,7 @@ Status HdfsParquetScanner::NextRowGroup() {
// Let's find the index of the first row in this row group. It's needed to track the
// file position of each row.
int64_t row_group_first_row = 0;
for (int i = 0; i < group_idx_; ++i) {
for (int i = 0; i < row_group_idx_; ++i) {
const parquet::RowGroup& row_group = file_metadata_.row_groups[i];
row_group_first_row += row_group.num_rows;
}
@@ -1280,7 +1324,7 @@ Status HdfsParquetScanner::ProcessPageIndex() {
MonotonicStopWatch single_process_page_index_timer;
single_process_page_index_timer.Start();
ResetPageFiltering();
RETURN_IF_ERROR(page_index_.ReadAll(group_idx_));
RETURN_IF_ERROR(page_index_.ReadAll(row_group_idx_));
if (page_index_.IsEmpty()) return Status::OK();
// We can release the raw page index buffer when we exit this function.
const auto scope_exit = MakeScopeExitTrigger([this](){page_index_.Release();});
@@ -1404,7 +1448,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
}
min_max_tuple_->Init(min_max_tuple_desc->byte_size());
parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
int filtered_pages = 0;
@@ -1507,7 +1551,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
}
Status HdfsParquetScanner::EvaluatePageIndex() {
parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
vector<RowRange> skip_ranges;
for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
@@ -1597,7 +1641,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
Status HdfsParquetScanner::ComputeCandidatePagesForColumns() {
if (candidate_ranges_.empty()) return Status::OK();
parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
const auto& page_locations = scalar_reader->offset_index_.page_locations;
if (!ComputeCandidatePages(page_locations, candidate_ranges_, row_group.num_rows,
@@ -2186,8 +2230,7 @@ Status HdfsParquetScanner::ProcessBloomFilter(const parquet::RowGroup&
if (!bloom_filter.Find(hash)) {
*skip_row_group = true;
VLOG(3) << Substitute("Row group with idx $0 filtered by Parquet Bloom filter on "
"column with idx $1 in file $2.",
group_idx_, col_idx, filename());
"column with idx $1 in file $2.", row_group_idx_, col_idx, filename());
return Status::OK();
}
}
@@ -2261,7 +2304,7 @@ Status HdfsParquetScanner::AssembleRowsWithoutLateMaterialization(
RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
if (row_batch->AtCapacity()) break;
}
rows_read_in_group_ += num_rows_read;
row_group_rows_read_ += num_rows_read;
COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
// Merge Scanner-local counter into HdfsScanNode counter and reset.
COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
@@ -2388,7 +2431,7 @@ Status HdfsParquetScanner::AssembleRows(RowBatch* row_batch, bool* skip_row_grou
break;
}
}
rows_read_in_group_ += num_rows_read;
row_group_rows_read_ += num_rows_read;
COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
// Merge Scanner-local counter into HdfsScanNode counter and reset.
COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
@@ -2788,7 +2831,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
DCHECK(column_readers != nullptr);
DCHECK(column_readers->empty());
if (scan_node_->optimize_count_star()) {
if (scan_node_->optimize_parquet_count_star()) {
// Column readers are not needed because we are not reading from any columns if this
// optimization is enabled.
return Status::OK();
@@ -2955,7 +2998,7 @@ Status HdfsParquetScanner::InitScalarColumns(int64_t row_group_first_row) {
int64_t partition_id = context_->partition_descriptor()->id();
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
DCHECK(file_desc != nullptr);
parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
// Used to validate that the number of values in each reader in column_readers_ at the
// same SchemaElement is the same.
@@ -2975,7 +3018,7 @@ Status HdfsParquetScanner::InitScalarColumns(int64_t row_group_first_row) {
return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
col_chunk.meta_data.num_values, num_values, filename());
}
RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, group_idx_,
RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_,
row_group_first_row));
}

View File

@@ -50,7 +50,7 @@ class ParquetColumnReader;
class ParquetPageReader;
template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
class ScalarColumnReader;
class BoolColumnReader;
/// This scanner parses Parquet files located in HDFS, and writes the content as tuples in
/// the Impala in-memory representation of data, e.g. (tuples, rows, row batches).
@@ -394,22 +394,30 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
"You can increase PARQUET_FOOTER_SIZE if you want, "
"just don't forget to increase READ_SIZE_MIN_VALUE as well.");
protected:
virtual int64_t GetNumberOfRowsInFile() const override {
return file_metadata_.num_rows;
}
private:
friend class ParquetColumnReader;
friend class CollectionColumnReader;
friend class BaseScalarColumnReader;
template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
friend class ScalarColumnReader;
friend class BoolColumnReader;
friend class HdfsParquetScannerTest;
friend class ParquetPageIndex;
friend class ParquetColumnChunkReader;
friend class ParquetPageReader;
/// Index of the current row group being processed. Initialized to -1 which indicates
/// that we have not started processing the first row group yet (GetNext() has not yet
/// been called).
int32_t row_group_idx_;
/// Counts the number of rows processed for the current row group.
int64_t row_group_rows_read_;
/// Indicates whether we should advance to the next row group in the next GetNext().
/// Starts out as true to move to the very first row group.
bool advance_row_group_;
boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
/// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_.
@@ -492,6 +500,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
/// perm_pool_.
std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
/// Timer for materializing rows. This ignores time getting the next buffer.
ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
/// Average and min/max time spent processing the page index for each row group.
RuntimeProfile::SummaryStatsCounter* process_page_index_stats_;

View File

@@ -600,7 +600,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
int64_t LastRowIdxInCurrentPage() const {
DCHECK(!candidate_data_pages_.empty());
int64_t num_rows =
parent_->file_metadata_.row_groups[parent_->group_idx_].num_rows;
parent_->file_metadata_.row_groups[parent_->row_group_idx_].num_rows;
// Find the next valid page.
int page_idx = candidate_data_pages_[candidate_page_idx_] + 1;
while (page_idx < offset_index_.page_locations.size()) {

View File

@@ -323,8 +323,9 @@ struct THdfsScanNode {
// The conjuncts that are eligible for dictionary filtering.
9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
// The byte offset of the slot for counter if count star optimization is enabled.
10: optional i32 count_star_slot_offset
// The byte offset of the slot for Parquet metadata if Parquet count star optimization
// is enabled.
10: optional i32 parquet_count_star_slot_offset
// If true, the backend only needs to return one row per partition.
11: optional bool is_partition_key_scan

View File

@@ -89,7 +89,6 @@ import org.apache.impala.thrift.TScanRangeLocationList;
import org.apache.impala.thrift.TScanRangeSpec;
import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.thrift.TTableStats;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.BitUtil;
import org.apache.impala.util.ExecutorMembershipSnapshot;
import org.apache.impala.util.MathUtil;
@@ -335,8 +334,6 @@ public class HdfsScanNode extends ScanNode {
// Used only to display EXPLAIN information.
private final List<Expr> partitionConjuncts_;
private boolean isFullAcidTable_ = false;
/**
* Construct a node to scan given data files into tuples described by 'desc',
* with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -357,8 +354,6 @@ public class HdfsScanNode extends ScanNode {
tableNumRowsHint_ = hdfsTblRef.getTableNumRowsHint();
FeFsTable hdfsTable = (FeFsTable)hdfsTblRef.getTable();
Preconditions.checkState(tbl_ == hdfsTable);
isFullAcidTable_ =
AcidUtils.isFullAcidTable(hdfsTable.getMetaStoreTable().getParameters());
StringBuilder error = new StringBuilder();
aggInfo_ = aggInfo;
skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error);
@@ -403,14 +398,13 @@ public class HdfsScanNode extends ScanNode {
}
/**
* Returns true if the count(*) optimization can be applied to the query block
* Returns true if the Parquet count(*) optimization can be applied to the query block
* of this scan node.
*/
protected boolean canApplyCountStarOptimization(Analyzer analyzer,
Set<HdfsFileFormat> fileFormats) {
if (fileFormats.size() != 1) return false;
if (isFullAcidTable_) return false;
if (!hasParquet(fileFormats) && !hasOrc(fileFormats)) return false;
if (!hasParquet(fileFormats)) return false;
return canApplyCountStarOptimization(analyzer);
}
@@ -1563,13 +1557,6 @@ public class HdfsScanNode extends ScanNode {
numRangesAdjusted :
Math.min(inputCardinality_, numRangesAdjusted);
}
if (countStarSlot_ != null) {
// We are doing optimized count star. Override cardinality with total num files.
long totalFiles = sumValues(totalFilesPerFs_);
inputCardinality_ = totalFiles;
cardinality_ = totalFiles;
}
if (LOG.isTraceEnabled()) {
LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_));
}
@@ -1867,7 +1854,8 @@ public class HdfsScanNode extends ScanNode {
msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null));
if (countStarSlot_ != null) {
msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
msg.hdfs_scan_node.setParquet_count_star_slot_offset(
countStarSlot_.getByteOffset());
}
if (!statsConjuncts_.isEmpty()) {
for (Expr e: statsConjuncts_) {

View File

@@ -1343,11 +1343,6 @@ public class PlannerTest extends PlannerTestBase {
runPlannerTestFile("tpcds-dist-method", "tpcds");
}
@Test
public void testOrcStatsAgg() {
runPlannerTestFile("orc-stats-agg");
}
/**
* Test new hint of 'TABLE_NUM_ROWS'
*/

View File

@@ -1,426 +0,0 @@
# Verify that that the ORC count(*) optimization is applied in all count(*) or
# count(<literal>) cases when scanning a ORC table. In the last case, we are scanning
# a text table, so the optimization is not applied. The optimization is observed when
# the cardinality of the ORC scan (24) is the same as # the # of files (24).
select count(*) from functional_orc_def.uncomp_src_alltypes
union all
select count(1) from functional_orc_def.uncomp_src_alltypes
union all
select count(123) from functional_orc_def.uncomp_src_alltypes
union all
select count(*) from functional.alltypes
---- PLAN
PLAN-ROOT SINK
|
00:UNION
| pass-through-operands: all
| row-size=8B cardinality=4
|
|--08:AGGREGATE [FINALIZE]
| | output: count(*)
| | row-size=8B cardinality=1
| |
| 07:SCAN HDFS [functional.alltypes]
| HDFS partitions=24/24 files=24 size=478.45KB
| row-size=0B cardinality=7.30K
|
|--06:AGGREGATE [FINALIZE]
| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| | row-size=8B cardinality=1
| |
| 05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
| HDFS partitions=24/24 files=24 size=205.47KB
| row-size=4B cardinality=24
|
|--04:AGGREGATE [FINALIZE]
| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| | row-size=8B cardinality=1
| |
| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
| HDFS partitions=24/24 files=24 size=205.47KB
| row-size=4B cardinality=24
|
02:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| row-size=8B cardinality=1
|
01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=24
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
00:UNION
| pass-through-operands: all
| row-size=8B cardinality=4
|
|--16:AGGREGATE [FINALIZE]
| | output: count:merge(*)
| | row-size=8B cardinality=1
| |
| 15:EXCHANGE [UNPARTITIONED]
| |
| 08:AGGREGATE
| | output: count(*)
| | row-size=8B cardinality=1
| |
| 07:SCAN HDFS [functional.alltypes]
| HDFS partitions=24/24 files=24 size=478.45KB
| row-size=0B cardinality=7.30K
|
|--14:AGGREGATE [FINALIZE]
| | output: count:merge(*)
| | row-size=8B cardinality=1
| |
| 13:EXCHANGE [UNPARTITIONED]
| |
| 06:AGGREGATE
| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| | row-size=8B cardinality=1
| |
| 05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
| HDFS partitions=24/24 files=24 size=205.47KB
| row-size=4B cardinality=24
|
|--12:AGGREGATE [FINALIZE]
| | output: count:merge(*)
| | row-size=8B cardinality=1
| |
| 11:EXCHANGE [UNPARTITIONED]
| |
| 04:AGGREGATE
| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| | row-size=8B cardinality=1
| |
| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
| HDFS partitions=24/24 files=24 size=205.47KB
| row-size=4B cardinality=24
|
10:AGGREGATE [FINALIZE]
| output: count:merge(*)
| row-size=8B cardinality=1
|
09:EXCHANGE [UNPARTITIONED]
|
02:AGGREGATE
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| row-size=8B cardinality=1
|
01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=24
====
# Verify that the ORC count(*) optimization is applied even if there is more than
# one item in the select list.
select count(*), count(1), count(123) from functional_orc_def.uncomp_src_alltypes
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| row-size=8B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=24
====
# Select count(<partition col>) - the optimization is disabled because it's not a
# count(<literal>) or count(*) aggregate function.
select count(year) from functional_orc_def.uncomp_src_alltypes
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(`year`)
| row-size=8B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=13.07K
====
# Group by partition columns.
select month, count(*) from functional_orc_def.uncomp_src_alltypes group by month, year
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| group by: `month`, `year`
| row-size=16B cardinality=24
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=16B cardinality=24
====
# The optimization is disabled because tinyint_col is not a partition col.
select tinyint_col, count(*) from functional_orc_def.uncomp_src_alltypes group by tinyint_col, year
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| group by: tinyint_col, `year`
| row-size=13B cardinality=13.07K
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=5B cardinality=13.07K
====
# The optimization is disabled because it can not be applied to the 1st aggregate
# function.
select avg(year), count(*) from functional_orc_def.uncomp_src_alltypes
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: avg(`year`), count(*)
| row-size=16B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=13.07K
====
# Optimization is not applied because the inner count(*) is not materialized. The outer
# count(*) does not reference a base table.
select count(*) from (select count(*) from functional_orc_def.uncomp_src_alltypes) t
---- PLAN
PLAN-ROOT SINK
|
02:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=1
|
01:AGGREGATE [FINALIZE]
| row-size=0B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
partition key scan
row-size=0B cardinality=24
====
# The optimization is applied if count(*) is in the having clause.
select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| having: count(*) > 1
| row-size=8B cardinality=0
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=24
====
# The count(*) optimization is applied in the inline view.
select count(*), count(a) from (select count(1) as a from functional_orc_def.uncomp_src_alltypes) t
---- PLAN
PLAN-ROOT SINK
|
02:AGGREGATE [FINALIZE]
| output: count(*), count(count(*))
| row-size=16B cardinality=1
|
01:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| row-size=8B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=24
====
# The count(*) optimization is applied to the inline view even if there is a join.
select *
from functional.alltypes x inner join (
select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year
) t on x.id = t.a;
---- PLAN
PLAN-ROOT SINK
|
03:HASH JOIN [INNER JOIN]
| hash predicates: x.id = count(*)
| runtime filters: RF000 <- count(*)
| row-size=101B cardinality=2
|
|--02:AGGREGATE [FINALIZE]
| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| | group by: `year`
| | row-size=12B cardinality=2
| |
| 01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
| HDFS partitions=24/24 files=24 size=205.47KB
| row-size=12B cardinality=24
|
00:SCAN HDFS [functional.alltypes x]
HDFS partitions=24/24 files=24 size=478.45KB
runtime filters: RF000 -> x.id
row-size=89B cardinality=7.30K
====
# The count(*) optimization is not applied if there is more than 1 table ref.
select count(*) from functional_orc_def.uncomp_src_alltypes a, functional_orc_def.uncomp_src_alltypes b
---- PLAN
PLAN-ROOT SINK
|
03:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=1
|
02:NESTED LOOP JOIN [CROSS JOIN]
| row-size=0B cardinality=170.85M
|
|--01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes b]
| HDFS partitions=24/24 files=24 size=205.47KB
| row-size=0B cardinality=13.07K
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes a]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=0B cardinality=13.07K
====
# The count(*) optimization is applied if all predicates are on partition columns only.
select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8;
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| row-size=8B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
partition predicates: `year` < 2010, `month` > 8
HDFS partitions=4/24 files=4 size=33.53KB
row-size=8B cardinality=4
====
# tinyint_col is not a partition column so the optimization is disabled.
select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and tinyint_col > 8;
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
partition predicates: `year` < 2010
HDFS partitions=12/24 files=12 size=102.74KB
predicates: tinyint_col > 8
row-size=1B cardinality=654
====
# Optimization is applied after constant folding.
select count(1 + 2 + 3) from functional_orc_def.uncomp_src_alltypes
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| row-size=8B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=4B cardinality=24
====
# Optimization is not applied to count(null).
select count(1 + null + 3) from functional_orc_def.uncomp_src_alltypes
union all
select count(null) from functional_orc_def.uncomp_src_alltypes
---- PLAN
PLAN-ROOT SINK
|
00:UNION
| pass-through-operands: all
| row-size=8B cardinality=2
|
|--04:AGGREGATE [FINALIZE]
| | output: count(NULL)
| | row-size=8B cardinality=1
| |
| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
| HDFS partitions=24/24 files=24 size=205.47KB
| row-size=0B cardinality=13.07K
|
02:AGGREGATE [FINALIZE]
| output: count(NULL + 3)
| row-size=8B cardinality=1
|
01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=0B cardinality=13.07K
====
# Optimization is not applied when selecting from an empty table.
select count(*) from functional_orc_def.emptytable
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=0
|
00:SCAN HDFS [functional_orc_def.emptytable]
partitions=0/0 files=0 size=0B
row-size=0B cardinality=0
====
# Optimization is not applied when all partitions are pruned.
select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=0
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
partition predicates: `year` = -1
partitions=0/24 files=0 size=0B
row-size=0B cardinality=0
====
# Optimization is not applied across query blocks, even though it would be correct here.
select count(*) from (select int_col from functional_orc_def.uncomp_src_alltypes) t
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=0B cardinality=13.07K
====
# In general, optimization is not applied when there is a distinct agg.
select count(*), count(distinct 1) from functional_orc_def.uncomp_src_alltypes
---- PLAN
PLAN-ROOT SINK
|
02:AGGREGATE [FINALIZE]
| output: count(1), count:merge(*)
| row-size=16B cardinality=1
|
01:AGGREGATE
| output: count(*)
| group by: 1
| row-size=9B cardinality=1
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
HDFS partitions=24/24 files=24 size=205.47KB
row-size=0B cardinality=13.07K
====
# The optimization is applied here because only the count(*) and a partition column are
# materialized. Non-materialized agg exprs are ignored.
select year, cnt from (
select year, count(bigint_col), count(*) cnt, avg(int_col)
from functional_orc_def.uncomp_src_alltypes
where month=1
group by year
) t
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows)
| group by: `year`
| row-size=12B cardinality=2
|
00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
partition predicates: `month` = 1
HDFS partitions=2/24 files=2 size=17.07KB
row-size=12B cardinality=2
====

View File

@@ -1,7 +1,6 @@
# Verify that that the parquet count(*) optimization is applied in all count(*) or
# count(<literal>) cases when scanning a Parquet table. In the last case, we are scanning
# a text table, so the optimization is not applied. The optimization is observed when
# the cardinality of the Parquet scan (24) is the same as # the # of files (24).
# a text table, so the optimization is not applied.
select count(*) from functional_parquet.alltypes
union all
select count(1) from functional_parquet.alltypes
@@ -30,7 +29,7 @@ PLAN-ROOT SINK
| |
| 05:SCAN HDFS [functional_parquet.alltypes]
| HDFS partitions=24/24 files=24 size=200.45KB
| row-size=8B cardinality=24
| row-size=8B cardinality=12.75K
|
|--04:AGGREGATE [FINALIZE]
| | output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
@@ -38,7 +37,7 @@ PLAN-ROOT SINK
| |
| 03:SCAN HDFS [functional_parquet.alltypes]
| HDFS partitions=24/24 files=24 size=200.45KB
| row-size=8B cardinality=24
| row-size=8B cardinality=12.75K
|
02:AGGREGATE [FINALIZE]
| output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
@@ -46,7 +45,7 @@ PLAN-ROOT SINK
|
01:SCAN HDFS [functional_parquet.alltypes]
HDFS partitions=24/24 files=24 size=200.45KB
row-size=8B cardinality=24
row-size=8B cardinality=12.75K
---- DISTRIBUTEDPLAN
PLAN-ROOT SINK
|
@@ -80,7 +79,7 @@ PLAN-ROOT SINK
| |
| 05:SCAN HDFS [functional_parquet.alltypes]
| HDFS partitions=24/24 files=24 size=200.45KB
| row-size=8B cardinality=24
| row-size=8B cardinality=12.75K
|
|--12:AGGREGATE [FINALIZE]
| | output: count:merge(*)
@@ -94,7 +93,7 @@ PLAN-ROOT SINK
| |
| 03:SCAN HDFS [functional_parquet.alltypes]
| HDFS partitions=24/24 files=24 size=200.45KB
| row-size=8B cardinality=24
| row-size=8B cardinality=12.75K
|
10:AGGREGATE [FINALIZE]
| output: count:merge(*)
@@ -108,7 +107,7 @@ PLAN-ROOT SINK
|
01:SCAN HDFS [functional_parquet.alltypes]
HDFS partitions=24/24 files=24 size=200.45KB
row-size=8B cardinality=24
row-size=8B cardinality=12.75K
====
# Verify that the parquet count(*) optimization is applied even if there is more than
# one item in the select list.
@@ -122,7 +121,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [functional_parquet.alltypes]
HDFS partitions=24/24 files=24 size=200.45KB
row-size=8B cardinality=24
row-size=8B cardinality=12.75K
====
# Select count(<partition col>) - the optimization should be disabled because it's not a
# count(<literal>) or count(*) aggregate function.
@@ -150,7 +149,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [functional_parquet.alltypes]
HDFS partitions=24/24 files=24 size=200.45KB
row-size=16B cardinality=24
row-size=16B cardinality=12.75K
====
# The optimization is disabled because tinyint_col is not a partition col.
select tinyint_col, count(*) from functional_parquet.alltypes group by tinyint_col, year
@@ -209,7 +208,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [functional_parquet.alltypes]
HDFS partitions=24/24 files=24 size=200.45KB
row-size=8B cardinality=24
row-size=8B cardinality=12.75K
====
# The count(*) optimization is applied in the inline view.
select count(*), count(a) from (select count(1) as a from functional_parquet.alltypes) t
@@ -226,7 +225,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [functional_parquet.alltypes]
HDFS partitions=24/24 files=24 size=200.45KB
row-size=8B cardinality=24
row-size=8B cardinality=12.75K
====
# The count(*) optimization is applied to the inline view even if there is a join.
select *
@@ -248,7 +247,7 @@ PLAN-ROOT SINK
| |
| 01:SCAN HDFS [functional_parquet.alltypes]
| HDFS partitions=24/24 files=24 size=200.45KB
| row-size=12B cardinality=24
| row-size=12B cardinality=12.75K
|
00:SCAN HDFS [functional.alltypes x]
HDFS partitions=24/24 files=24 size=478.45KB
@@ -287,7 +286,7 @@ PLAN-ROOT SINK
00:SCAN HDFS [functional_parquet.alltypes]
partition predicates: `year` < 2010, `month` > 8
HDFS partitions=4/24 files=4 size=33.53KB
row-size=8B cardinality=4
row-size=8B cardinality=2.13K
====
# tinyint_col is not a partition column so the optimization is disabled.
select count(1) from functional_parquet.alltypes where year < 2010 and tinyint_col > 8;
@@ -315,7 +314,7 @@ PLAN-ROOT SINK
|
00:SCAN HDFS [functional_parquet.alltypes]
HDFS partitions=24/24 files=24 size=200.45KB
row-size=8B cardinality=24
row-size=8B cardinality=12.75K
====
# Optimization is not applied to count(null).
select count(1 + null + 3) from functional_parquet.alltypes
@@ -421,5 +420,5 @@ PLAN-ROOT SINK
00:SCAN HDFS [functional_parquet.alltypes]
partition predicates: `month` = 1
HDFS partitions=2/24 files=2 size=17.07KB
row-size=12B cardinality=2
row-size=12B cardinality=1.09K
====

View File

@@ -1869,7 +1869,6 @@ select count(*) from tpch_parquet.lineitem
---- PLAN
Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
Per-Host Resource Estimates: Memory=10MB
Codegen disabled by planner
Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1891,12 +1890,11 @@ PLAN-ROOT SINK
columns: all
extrapolated-rows=disabled max-scan-range-rows=2.14M
mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
tuple-ids=0 row-size=8B cardinality=3
tuple-ids=0 row-size=8B cardinality=6.00M
in pipelines: 00(GETNEXT)
---- DISTRIBUTEDPLAN
Max Per-Host Resource Reservation: Memory=128.00KB Threads=3
Per-Host Resource Estimates: Memory=10MB
Codegen disabled by planner
Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1931,12 +1929,11 @@ Per-Host Resources: mem-estimate=1.02MB mem-reservation=128.00KB thread-reservat
columns: all
extrapolated-rows=disabled max-scan-range-rows=2.14M
mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
tuple-ids=0 row-size=8B cardinality=3
tuple-ids=0 row-size=8B cardinality=6.00M
in pipelines: 00(GETNEXT)
---- PARALLELPLANS
Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
Per-Host Resource Estimates: Memory=80MB
Codegen disabled by planner
Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1971,7 +1968,7 @@ Per-Instance Resources: mem-estimate=80.02MB mem-reservation=128.00KB thread-res
columns: all
extrapolated-rows=disabled max-scan-range-rows=2.14M
mem-estimate=80.00MB mem-reservation=128.00KB thread-reservation=0
tuple-ids=0 row-size=8B cardinality=3
tuple-ids=0 row-size=8B cardinality=6.00M
in pipelines: 00(GETNEXT)
====
# Sort

View File

@@ -39,7 +39,6 @@ select count(1) from ice_compound_pred_pd;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_compound_pred_pd;
@@ -424,7 +423,6 @@ select count(1) from ice_compound_pred_pd1;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_compound_pred_pd1;

View File

@@ -33,7 +33,6 @@ select count(1) from ice_pred_pd1;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_pred_pd1;
@@ -63,6 +62,7 @@ aggregation(SUM, NumRowGroups): 3
---- QUERY
# Filtering only on a partition column is done by Iceberg and in Impala we can get the results
# simply using file metadata.
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 3.
select
count(1)
from
@@ -72,8 +72,7 @@ where
---- RESULTS
9
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 3
aggregation(SUM, NumRowGroups): 3
====
---- QUERY
# The IN predicate matches two row groups
@@ -447,6 +446,7 @@ aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# NOT_IN could be answered using file metadata if only partition cols are included
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 1.
select
count(1)
from
@@ -456,8 +456,7 @@ where
---- RESULTS
3
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 1
aggregation(SUM, NumRowGroups): 1
====
---- QUERY
# NOT_IN does not work because col_dt is not the partition column
@@ -548,7 +547,6 @@ select count(1) from ice_pred_pd2;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_pred_pd2;

View File

@@ -35,7 +35,6 @@ select count(1) from ice_is_null_pred_pd;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_is_null_pred_pd;

View File

@@ -170,18 +170,17 @@ select count(*) from ice_bigints;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
# When filtered only by partition column Iceberg can do the filtering and no need to read data in Impala.
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 1.
select count(*) from ice_bigints
where i = 0 and j = 0;
---- RESULTS
1217
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumRowGroups): 1
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 1
====
---- QUERY
# When not just partition columns are involved in the filtering then Impala has to read data to answer the query.
@@ -281,6 +280,7 @@ select count(*) from alltypes_part;
Expression 'timestamp_col' (type: TIMESTAMP) would need to be cast to STRING for column 'string_col'
====
---- QUERY
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4.
select count(*) from alltypes_part
where bool_col = true;
---- RESULTS
@@ -288,10 +288,10 @@ where bool_col = true;
---- TYPES
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, NumRowGroups): 4
====
---- QUERY
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4.
select count(*) from alltypes_part
where float_col = 0;
---- RESULTS
@@ -299,10 +299,10 @@ where float_col = 0;
---- TYPES
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, NumRowGroups): 4
====
---- QUERY
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4.
select count(*) from alltypes_part
where double_col = 0;
---- RESULTS
@@ -310,10 +310,10 @@ where double_col = 0;
---- TYPES
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, NumRowGroups): 4
====
---- QUERY
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 2.
select count(*) from alltypes_part
where date_col = '2009-01-01';
---- RESULTS
@@ -321,10 +321,10 @@ where date_col = '2009-01-01';
---- TYPES
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 2
aggregation(SUM, NumRowGroups): 2
====
---- QUERY
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4.
select count(*) from alltypes_part
where string_col = '0';
---- RESULTS
@@ -332,8 +332,7 @@ where string_col = '0';
---- TYPES
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, NumRowGroups): 4
====
---- QUERY
# 'timestamp_col' is not a partitioning column, so min/max stats will not be used to

View File

@@ -13,7 +13,6 @@ select count(*) from ice_tbl;
0
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
insert into
@@ -34,7 +33,6 @@ select count(*) from ice_tbl;
3
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
create table ice_tbl_u1 stored as iceberg as select * from ice_tbl;
@@ -47,7 +45,6 @@ select count(*) from ice_tbl_u1;
3
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
insert into
@@ -69,7 +66,6 @@ select count(*) from ice_tbl;
6
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
create table ice_tbl_u2 stored as iceberg as select * from ice_tbl;
@@ -82,7 +78,6 @@ select count(*) from ice_tbl_u2;
6
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
insert into
@@ -105,7 +100,6 @@ select count(*) from ice_tbl;
8
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
select count(*) from ice_tbl for system_time as of now();
@@ -113,7 +107,6 @@ select count(*) from ice_tbl for system_time as of now();
8
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
set explain_level=3;
@@ -145,6 +138,7 @@ explain select 123, count(*), 321 from ice_tbl;
====
---- QUERY
# Filtering by a partition column results in Iceberg doing the filtering instead of Impala.
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 2.
select
count(*)
from
@@ -154,8 +148,7 @@ where
---- RESULTS
4
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 2
aggregation(SUM, NumRowGroups): 2
====
---- QUERY
select
@@ -167,7 +160,6 @@ having
---- RESULTS
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 4
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
select
@@ -181,7 +173,6 @@ group by
4
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 4
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
select
@@ -192,7 +183,6 @@ from
6
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 4
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
truncate ice_tbl;
@@ -205,7 +195,6 @@ select count(*) from ice_tbl;
0
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
create table parq_tbl(col_i INT, col_s STRING) PARTITIONED BY(x INT) STORED AS PARQUET;
@@ -213,6 +202,7 @@ create table parq_tbl(col_i INT, col_s STRING) PARTITIONED BY(x INT) STORED AS P
'Table has been created.'
====
---- QUERY
# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 3.
insert into parq_tbl PARTITION(x = 12340) values (0, "a");
insert into parq_tbl PARTITION(x = 12341) values (1, "b");
insert into parq_tbl PARTITION(x = 12342) values (2, "c");
@@ -220,8 +210,7 @@ select count(*) from parq_tbl;
---- RESULTS
3
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 3
aggregation(SUM, NumRowGroups): 3
====
---- QUERY
select count(*) as c from ice_tbl_u1 union all (select count(*) c from ice_tbl_u2) order by c;
@@ -232,7 +221,6 @@ select count(*) as c from ice_tbl_u1 union all (select count(*) c from ice_tbl_u
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
with u1 as (select count(*) from ice_tbl_u1), u2 as (select count(*) from ice_tbl_u2) select * from u1, u2;
@@ -242,7 +230,6 @@ with u1 as (select count(*) from ice_tbl_u1), u2 as (select count(*) from ice_tb
BIGINT,BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
with u1 as (select count(*) from ice_tbl_u1),
@@ -254,5 +241,4 @@ u2 as (select count(*) from ice_tbl_u1 union all (select count(*) from ice_tbl_u
BIGINT,BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
====

View File

@@ -20,7 +20,6 @@ select count(*) from ice_types1;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_types1;
@@ -252,7 +251,6 @@ select count(*) from ice_types2;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_types2;
@@ -370,7 +368,6 @@ select count(*) from ice_types3;
BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
show files in ice_types3;

View File

@@ -20,7 +20,6 @@ BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 4
aggregation(SUM, NumOrcStripes): 4
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
with u1 as (select count(*) as c from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files),
@@ -35,7 +34,6 @@ BIGINT,BIGINT,TINYINT,BIGINT,BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 4
aggregation(SUM, NumOrcStripes): 4
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
select count(*) as c from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945
@@ -58,7 +56,6 @@ BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 2
aggregation(SUM, NumOrcStripes): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
with u1 as (select count(*) as c from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945),
@@ -73,5 +70,4 @@ BIGINT,BIGINT,TINYINT,BIGINT,BIGINT
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 2
aggregation(SUM, NumOrcStripes): 2
aggregation(SUM, NumFileMetadataRead): 0
====
====

View File

@@ -39,7 +39,6 @@ SELECT count(*) from iceberg_v2_no_deletes_orc
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_positional_delete_all_rows_orc
@@ -81,7 +80,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc for system_versio
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc
@@ -91,7 +89,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files_orc
@@ -145,7 +142,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_orc for system_version as of 5003445199566617082
@@ -155,7 +151,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_orc
@@ -165,7 +160,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 4
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_partitioned_position_deletes_orc
@@ -207,7 +201,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc for system_vers
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc
@@ -217,7 +210,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 6
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_no_deletes_orc where i = 2;

View File

@@ -39,7 +39,6 @@ SELECT count(*) from iceberg_v2_no_deletes
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_delete_positional
@@ -81,7 +80,6 @@ SELECT count(*) from iceberg_v2_delete_positional for system_version as of 68169
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_delete_positional;
@@ -91,7 +89,6 @@ SELECT count(*) from iceberg_v2_delete_positional;
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_positional_delete_all_rows
@@ -133,7 +130,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows for system_version as
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_positional_delete_all_rows
@@ -143,7 +139,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files
@@ -197,7 +192,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945
@@ -207,7 +201,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files
@@ -217,7 +210,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 4
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_positional_update_all_rows
@@ -268,7 +260,6 @@ SELECT count(*) from iceberg_v2_positional_update_all_rows for system_version as
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_positional_update_all_rows
@@ -278,7 +269,6 @@ SELECT count(*) from iceberg_v2_positional_update_all_rows
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 2
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
COMPUTE STATS iceberg_v2_partitioned_position_deletes
@@ -320,7 +310,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes for system_version
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_partitioned_position_deletes
@@ -330,7 +319,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 6
aggregation(SUM, NumFileMetadataRead): 0
====
---- QUERY
SELECT count(*) from iceberg_v2_no_deletes where i = 2;

View File

@@ -25,17 +25,16 @@ bigint, bigint
280,1260
====
---- QUERY
# IMPALA-11123: IMPALA-5861 add this test to verify that 'RowRead' counter is not double
# counted for zero slot scan. IMPALA-11123 remove incerement of 'RowRead' counter
# in case of optimized count(star) and zero slot scan query. This cause reduction of
# 'RowsRead' value from 1200 to 900 since the other 300 are served through
# zero slot scan. We do not verify 'NumFileMetadataRead' since it does not stay the same
# over different test vector permutation.
# IMPALA-5861: RowsRead counter should be accurate for table scan that returns
# zero slots. This test is run with various batch_size values, which helps
# reproduce the bug. Scanning multiple file formats triggers the bug because
# the Parquet count(*) rewrite is disabled when non-Parquet file formats are
# present.
select count(*) from functional.alltypesmixedformat
---- TYPES
bigint
---- RESULTS
1200
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 900
aggregation(SUM, RowsRead): 1200
====

View File

@@ -1,152 +0,0 @@
====
---- QUERY
# Tests the correctness of the ORC count(*) optimization.
select count(1)
from functional_orc_def.uncomp_src_alltypes
---- RESULTS
7300
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Tests the correctness of zero slot scan over ORC.
# Does not verify 'NumFileMetadataRead' here since codegen vs non-codegen yield
# different number.
select 1 from functional_orc_def.alltypestiny
---- RESULTS
1
1
1
1
1
1
1
1
---- TYPES
tinyint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# ORC count(*) optimization with predicates on the partition columns.
select count(1)
from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
---- RESULTS
1220
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# ORC count(*) optimization with group by partition columns.
select year, month, count(1)
from functional_orc_def.uncomp_src_alltypes group by year, month
---- RESULTS
2009,1,310
2009,2,280
2009,3,310
2009,4,300
2009,5,310
2009,6,300
2009,7,310
2009,8,310
2009,9,300
2009,10,310
2009,11,300
2009,12,310
2010,1,310
2010,2,280
2010,3,310
2010,4,300
2010,5,310
2010,6,300
2010,7,310
2010,8,310
2010,9,300
2010,10,310
2010,11,300
2010,12,310
---- TYPES
int, int, bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# ORC count(*) optimization with both group by and predicates on partition columns.
select count(1)
from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
group by month
---- RESULTS
310
300
310
300
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# ORC count(*) optimization with the result going into a join.
select x.bigint_col from functional_orc_def.uncomp_src_alltypes x
inner join (
select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year
) t on x.id = t.a;
---- RESULTS
0
0
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 24
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 7300
=====
---- QUERY
# ORC count(*) optimization with the agg function in the having clause.
select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
---- RESULTS
1
---- TYPES
tinyint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 0
====
---- QUERY
# Verify that 0 is returned for count(*) on an empty table.
select count(1) from functional_orc_def.emptytable
---- RESULTS
0
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 0
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Verify that 0 is returned when all partitions are pruned.
select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
---- RESULTS
0
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumOrcStripes): 0
aggregation(SUM, NumFileMetadataRead): 0
aggregation(SUM, RowsRead): 0
=====

View File

@@ -7,30 +7,6 @@ from functional_parquet.alltypes
7300
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Tests the correctness of zero slot scan over Parquet.
# Not checking 'NumFileMetadataRead' here since codegen vs non-codegen yield
# different number.
select 1 from functional_orc_def.alltypestiny
---- RESULTS
1
1
1
1
1
1
1
1
---- TYPES
tinyint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Parquet count(*) optimization with predicates on the partition columns.
@@ -40,10 +16,6 @@ from functional_parquet.alltypes where year < 2010 and month > 8
1220
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Parquet count(*) optimization with group by partition columns.
@@ -76,10 +48,6 @@ from functional_parquet.alltypes group by year, month
2010,12,310
---- TYPES
int, int, bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Parquet count(*) optimization with both group by and predicates on partition columns.
@@ -93,10 +61,6 @@ group by month
300
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 4
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Parquet count(*) optimization with the result going into a join.
@@ -109,10 +73,6 @@ select x.bigint_col from functional.alltypes x
0
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 7300
=====
---- QUERY
# Parquet count(*) optimization with the agg function in the having clause.
@@ -121,10 +81,6 @@ select 1 from functional_parquet.alltypes having count(*) > 1
1
---- TYPES
tinyint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 24
aggregation(SUM, RowsRead): 0
====
---- QUERY
# Verify that 0 is returned for count(*) on an empty table.
@@ -133,10 +89,6 @@ select count(1) from functional_parquet.emptytable
0
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Verify that 0 is returned when all partitions are pruned.
@@ -145,10 +97,6 @@ select count(1) from functional_parquet.alltypes where year = -1
0
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 0
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# Test different row group size combinations.
@@ -166,9 +114,6 @@ select count(*) from tpch_parquet.lineitem
6001215
---- TYPES
bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, RowsRead): 0
=====
---- QUERY
# IMPALA-5679: Count(*) with group by on a string partition column.
@@ -191,8 +136,4 @@ select string_col, count(*) from $DATABASE.string_partitioned_table group by str
'9',730
---- TYPES
string, bigint
---- RUNTIME_PROFILE
aggregation(SUM, NumRowGroups): 0
aggregation(SUM, NumFileMetadataRead): 10
aggregation(SUM, RowsRead): 0
=====

View File

@@ -11,10 +11,6 @@ INT
---- RUNTIME_PROFILE
# Confirm that only one row per file is read.
aggregation(SUM, RowsRead): 24
---- RUNTIME_PROFILE: table_format=parquet,orc
# Confirm that only one metadata per file is read.
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 24
====
---- QUERY
# Test with more complex multiple distinct aggregation.
@@ -27,10 +23,6 @@ BIGINT,BIGINT
---- RUNTIME_PROFILE
# Confirm that only one row per file is read.
aggregation(SUM, RowsRead): 24
---- RUNTIME_PROFILE: table_format=parquet,orc
# Confirm that only one metadata per file is read.
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 24
====
---- QUERY
# Distinct aggregation with multiple columns.
@@ -66,10 +58,6 @@ INT,INT
---- RUNTIME_PROFILE
# Confirm that only one row per file is read.
aggregation(SUM, RowsRead): 24
---- RUNTIME_PROFILE: table_format=parquet,orc
# Confirm that only one metadata per file is read.
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 24
====
---- QUERY
# Partition key scan combined with analytic function.
@@ -83,10 +71,6 @@ INT,BIGINT
---- RUNTIME_PROFILE
# Confirm that only one row per file is read.
aggregation(SUM, RowsRead): 24
---- RUNTIME_PROFILE: table_format=parquet,orc
# Confirm that only one metadata per file is read.
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 24
====
---- QUERY
# Partition scan combined with sort.
@@ -123,10 +107,6 @@ INT,INT
---- RUNTIME_PROFILE
# Confirm that only one row per file is read.
aggregation(SUM, RowsRead): 24
---- RUNTIME_PROFILE: table_format=parquet,orc
# Confirm that only one metadata per file is read.
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 24
====
---- QUERY
# Partition key scan combined with predicate on partition columns
@@ -141,10 +121,6 @@ INT,INT
---- RUNTIME_PROFILE
# Confirm that only one row per file is read.
aggregation(SUM, RowsRead): 2
---- RUNTIME_PROFILE: table_format=parquet,orc
# Confirm that only one metadata per file is read.
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 2
====
---- QUERY
# Partition key scan combined with having predicate.
@@ -160,10 +136,6 @@ INT,INT
---- RUNTIME_PROFILE
# Confirm that only one row per file is read.
aggregation(SUM, RowsRead): 24
---- RUNTIME_PROFILE: table_format=parquet,orc
# Confirm that only one metadata per file is read.
aggregation(SUM, RowsRead): 0
aggregation(SUM, NumFileMetadataRead): 24
====
---- QUERY
# Empty table should not return any rows

View File

@@ -15,10 +15,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
620
---- RUNTIME_PROFILE
row_regex: .*RowsRead: 2.43K .*
---- RUNTIME_PROFILE: table_format=parquet,orc
row_regex: .*RowsReturned: 2.43K .*
aggregation(SUM, RowsRead): 2
aggregation(SUM, NumFileMetadataRead): 48
====
---- QUERY
# Now turn on local filtering: we expect to see a reduction in scan volume.
@@ -53,10 +49,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
620
---- RUNTIME_PROFILE
row_regex: .*RowsRead: 2.43K .*
---- RUNTIME_PROFILE: table_format=parquet,orc
row_regex: .*RowsReturned: 2.43K .*
aggregation(SUM, RowsRead): 2
aggregation(SUM, NumFileMetadataRead): 48
====
---- QUERY
# Shuffle join, global mode. Expect filters to be propagated.

View File

@@ -15,10 +15,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
620
---- RUNTIME_PROFILE
row_regex: .*RowsRead: 2.43K .*
---- RUNTIME_PROFILE: table_format=parquet,orc
row_regex: .*RowsReturned: 2.43K .*
aggregation(SUM, RowsRead): 2
aggregation(SUM, NumFileMetadataRead): 48
====
---- QUERY
# Now turn on local filtering: we expect to see a reduction in scan volume.
@@ -53,10 +49,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
620
---- RUNTIME_PROFILE
row_regex: .*RowsRead: 2.43K .*
---- RUNTIME_PROFILE: table_format=parquet,orc
row_regex: .*RowsReturned: 2.43K .*
aggregation(SUM, RowsRead): 2
aggregation(SUM, NumFileMetadataRead): 48
====
---- QUERY
# Shuffle join, global mode. Expect filters to be propagated.

View File

@@ -238,9 +238,6 @@ tinyint
1
---- RUNTIME_PROFILE
aggregation(SUM, RowsRead): 100
---- RUNTIME_PROFILE: table_format=parquet,orc
aggregation(SUM, RowsRead): 0
aggregation(SUM, RowsReturned): 200
====
---- QUERY
select year, count(*) from alltypes group by year

View File

@@ -771,7 +771,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
different number of executors and memory limit in each."""
# A small query with estimated memory per host of 10MB that can run on the small
# executor group
SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim where d_year=2022;"
SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim;"
# A large query with estimated memory per host of 132MB that can only run on
# the large executor group.
LARGE_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;"

View File

@@ -79,11 +79,6 @@ class TestQueryRetries(CustomClusterTestSuite):
union all
select count(*) from functional.alltypes where bool_col = sleep(50)"""
# A simple count query with predicate. The predicate is needed so that the planner does
# not create the optimized count(star) query plan.
_count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
_count_query_result = "55"
@classmethod
def get_workload(cls):
return 'functional-query'
@@ -257,7 +252,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
killed_impalad = self.__kill_random_impalad()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -269,7 +264,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert self._count_query_result in results.data[0]
assert "6001215" in results.data[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -317,7 +312,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# and the query should be retried. Add delay before admission so that the 2nd node
# is removed from the blacklist before scheduler makes schedule for the retried
# query.
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -330,7 +325,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert self._count_query_result in results.data[0]
assert "6001215" in results.data[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -380,7 +375,7 @@ class TestQueryRetries(CustomClusterTestSuite):
rpc_not_accessible_impalad = self.cluster.impalads[1]
assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -703,7 +698,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -742,7 +737,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.__wait_until_retry_state(handle, 'RETRYING')
@@ -772,7 +767,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
@@ -796,7 +791,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
self.hs2_client.execute_async(query)
@@ -823,7 +818,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
self.execute_query_async(query, query_options={'retry_failed_queries': 'true'})
# The number of in-flight queries is 0 at the beginning, then 1 when the original
# query is submitted. It's 2 when the retried query is registered. Although the retry
@@ -853,7 +848,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
@@ -892,7 +887,7 @@ class TestQueryRetries(CustomClusterTestSuite):
# Kill an impalad, and run a query. The query should be retried.
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
client = self.cluster.get_first_impalad().service.create_beeswax_client()
client.set_configuration({'retry_failed_queries': 'true'})
handle = client.execute_async(query)
@@ -922,7 +917,7 @@ class TestQueryRetries(CustomClusterTestSuite):
"""Test query retries with the HS2 protocol. Enable the results set cache as well and
test that query retries work with the results cache."""
self.cluster.impalads[1].kill()
query = self._count_query
query = "select count(*) from tpch_parquet.lineitem"
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
handle = self.hs2_client.execute_async(query)
@@ -931,7 +926,7 @@ class TestQueryRetries(CustomClusterTestSuite):
results = self.hs2_client.fetch(query, handle)
assert results.success
assert len(results.data) == 1
assert results.data[0] == self._count_query_result
assert int(results.data[0]) == 6001215
# Validate the live exec summary.
retried_query_id = \

View File

@@ -260,6 +260,24 @@ class TestAggregationQueries(ImpalaTestSuite):
# Verify codegen was enabled for all four stages of the aggregation.
assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
def test_parquet_count_star_optimization(self, vector, unique_database):
if (vector.get_value('table_format').file_format != 'text' or
vector.get_value('table_format').compression_codec != 'none'):
# No need to run this test on all file formats
pytest.skip()
self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
vector.get_value('exec_option')['batch_size'] = 1
self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
def test_kudu_count_star_optimization(self, vector, unique_database):
if (vector.get_value('table_format').file_format != 'text' or
vector.get_value('table_format').compression_codec != 'none'):
# No need to run this test on all file formats
pytest.skip()
self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
vector.get_value('exec_option')['batch_size'] = 1
self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
def test_ndv(self):
"""Test the version of NDV() that accepts a scale value argument against
different column data types. The scale argument is an integer in range
@@ -303,55 +321,17 @@ class TestAggregationQueries(ImpalaTestSuite):
for j in range(0, 11):
assert(ndv_results[i - 1][j] == int(ndv_vals[j]))
def test_grouping_sets(self, vector):
"""Tests for ROLLUP, CUBE and GROUPING SETS."""
if vector.get_value('table_format').file_format == 'hbase':
pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
self.run_test_case('QueryTest/grouping-sets', vector)
class TestAggregationQueriesRunOnce(ImpalaTestSuite):
"""Run the aggregation test suite similarly as TestAggregationQueries, but with stricter
constraint. Each test in this class only run once by setting uncompressed text dimension
for all exploration strategy. However, they may not necessarily target uncompressed text
table format. This also run with codegen enabled and disabled to exercise our
non-codegen code"""
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestAggregationQueriesRunOnce, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
create_exec_option_dimension(disable_codegen_options=[False, True]))
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def test_parquet_count_star_optimization(self, vector, unique_database):
self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
vector.get_value('exec_option')['batch_size'] = 1
self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
def test_kudu_count_star_optimization(self, vector):
self.run_test_case('QueryTest/kudu-stats-agg', vector)
vector.get_value('exec_option')['batch_size'] = 1
self.run_test_case('QueryTest/kudu-stats-agg', vector)
def test_orc_count_star_optimization(self, vector):
self.run_test_case('QueryTest/orc-stats-agg', vector)
vector.get_value('exec_option')['batch_size'] = 1
self.run_test_case('QueryTest/orc-stats-agg', vector)
def test_sampled_ndv(self, vector):
def test_sampled_ndv(self, vector, unique_database):
"""The SAMPLED_NDV() function is inherently non-deterministic and cannot be
reasonably made deterministic with existing options so we test it separately.
The goal of this test is to ensure that SAMPLED_NDV() works on all data types
and returns approximately sensible estimates. It is not the goal of this test
to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is expected
be inaccurate on small data sets like the ones we use in this test."""
if (vector.get_value('table_format').file_format != 'text' or
vector.get_value('table_format').compression_codec != 'none'):
# No need to run this test on all file formats
pytest.skip()
# NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and SAMPLED_NDV()
# are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() can do.
@@ -405,6 +385,12 @@ class TestAggregationQueriesRunOnce(ImpalaTestSuite):
for i in range(14, 16):
self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)
def test_grouping_sets(self, vector):
"""Tests for ROLLUP, CUBE and GROUPING SETS."""
if vector.get_value('table_format').file_format == 'hbase':
pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
self.run_test_case('QueryTest/grouping-sets', vector)
class TestDistinctAggregation(ImpalaTestSuite):
"""Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs

View File

@@ -453,7 +453,6 @@ class TestIcebergTable(IcebergTestSuite):
assert len(data.data) == 1
assert expected in data.data
assert "NumRowGroups" not in data.runtime_profile
assert "NumFileMetadataRead" not in data.runtime_profile
def expect_results_t(ts, expected_results, expected_cols):
expect_results(

View File

@@ -1623,8 +1623,7 @@ class TestOrc(ImpalaTestSuite):
def _misaligned_orc_stripes_helper(
self, table_name, rows_in_table, num_scanners_with_no_reads=0):
"""Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners
that don't read anything because the underlying file is poorly formatted.
Additionally, test that select count(star) match with expected number of rows.
that don't read anything because the underlying file is poorly formatted
"""
query = 'select * from %s' % table_name
result = self.client.execute(query)
@@ -1645,11 +1644,6 @@ class TestOrc(ImpalaTestSuite):
total += int(n)
assert total == num_scanners_with_no_reads
# Test that select count(star) match with expected number of rows.
query = 'select count(*) from %s' % table_name
result = self.client.execute(query)
assert int(result.data[0]) == rows_in_table
# Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive
# queries that hang in some cases (IMPALA-9345). It would be possible to separate
# the tests that use Hive and run most tests on S3, but I think that running these on
@@ -1783,13 +1777,13 @@ class TestOrc(ImpalaTestSuite):
"CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC",
unique_database, test_name, test_files)
err = self.execute_query_expect_failure(self.client,
"select count(id) from {0}.{1}".format(unique_database, test_name))
"select count(*) from {0}.{1}".format(unique_database, test_name))
assert expected_error in str(err)
def test_invalid_schema(self, vector, unique_database):
"""Test scanning of ORC file with malformed schema."""
self._run_invalid_schema_test(unique_database, "corrupt_schema",
"Encountered parse error in tail of ORC file")
"Encountered parse error during schema selection")
self._run_invalid_schema_test(unique_database, "corrupt_root_type",
"Root of the selected type returned by the ORC lib is not STRUCT: boolean.")

View File

@@ -264,19 +264,15 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True):
# evaluated for all formats that don't have a commented section for this query.
if subsection_name == 'RUNTIME_PROFILE':
if subsection_comment is not None and subsection_comment is not "":
allowed_formats = ['kudu', 'parquet', 'orc']
allowed_formats = ['kudu']
if not subsection_comment.startswith("table_format="):
raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the form '
'"table_format=FORMAT[,FORMAT2,...]"' % subsection_comment)
parsed_formats = subsection_comment[13:].split(',')
for table_format in parsed_formats:
if table_format not in allowed_formats:
raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' %
(table_format, allowed_formats))
else:
subsection_name_for_format = 'RUNTIME_PROFILE_%s' % table_format
parsed_sections[subsection_name_for_format] = subsection_str
continue
'"table_format=FORMAT"' % subsection_comment)
table_format = subsection_comment[13:]
if table_format not in allowed_formats:
raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' %
(table_format, allowed_formats))
subsection_name = 'RUNTIME_PROFILE_%s' % table_format
parsed_sections[subsection_name] = subsection_str