mirror of
https://github.com/apache/impala.git
synced 2026-01-05 21:00:54 -05:00
IMPALA-4794 changed the distinct aggregation behavior to shuffling by both grouping exprs and the distinct expr. It's slower in queries where the NDVs of grouping exprs are high and data are uniformly distributed among groups. This patch adds a query option controlling this behavior, letting users switch to the old plan. Change-Id: Icb4b4576fb29edd62cf4b4ba0719c0e0a2a5a8dc Reviewed-on: http://gerrit.cloudera.org:8080/9949 Reviewed-by: Tianyi Wang <twang@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
422 lines
20 KiB
Python
422 lines
20 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Validates all aggregate functions across all datatypes
|
|
#
|
|
import pytest
|
|
|
|
from testdata.common import widetable
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.test_dimensions import (
|
|
create_exec_option_dimension,
|
|
create_exec_option_dimension_from_dict,
|
|
create_uncompressed_text_dimension)
|
|
from tests.common.test_result_verifier import (
|
|
assert_codegen_enabled,
|
|
parse_column_types,
|
|
parse_column_labels,
|
|
QueryTestResult,
|
|
parse_result_rows)
|
|
from tests.common.test_vector import ImpalaTestDimension
|
|
|
|
# Test dimensions for TestAggregation.
|
|
AGG_FUNCTIONS = ['sum', 'count', 'min', 'max', 'avg', 'ndv']
|
|
DATA_TYPES = ['int', 'bool', 'double', 'bigint', 'tinyint',
|
|
'smallint', 'float', 'timestamp', 'string']
|
|
|
|
# Lookup table for TestAggregation results.
|
|
result_lut = {
|
|
'sum-tinyint': 45000, 'avg-tinyint': 5, 'count-tinyint': 9000,
|
|
'min-tinyint': 1, 'max-tinyint': 9, 'ndv-tinyint': 9,
|
|
'sum-smallint': 495000, 'avg-smallint': 50, 'count-smallint': 9900,
|
|
'min-smallint': 1, 'max-smallint': 99, 'ndv-smallint': 99,
|
|
'sum-int': 4995000, 'avg-int': 500, 'count-int': 9990,
|
|
'min-int': 1, 'max-int': 999, 'ndv-int': 999,
|
|
'sum-bigint': 49950000, 'avg-bigint': 5000, 'count-bigint': 9990,
|
|
'min-bigint': 10, 'max-bigint' : 9990, 'ndv-bigint': 999,
|
|
'sum-bool': 5000, 'count-bool': 10000, 'min-bool': 'false',
|
|
'max-bool': 'true', 'avg-bool': 0.5, 'ndv-bool': 2,
|
|
'sum-double': 50449500.0, 'count-double': 9990, 'min-double': 10.1,
|
|
'max-double': 10089.9, 'avg-double': 5050.0, 'ndv-double': 999,
|
|
'sum-float': 5494500.0, 'count-float': 9990, 'min-float': 1.10,
|
|
'max-float': 1098.9, 'avg-float': 550.0, 'ndv-float': 999,
|
|
'count-timestamp': 10000, 'min-timestamp': '2010-01-01 00:00:00',
|
|
'max-timestamp': '2010-01-10 18:02:05.100000000',
|
|
'avg-timestamp': '2010-01-05 20:47:11.705080000', 'ndv-timestamp': 10000,
|
|
'count-string': 10000, 'min-string': '0', 'max-string': '999', 'ndv-string': 999,
|
|
'sum-distinct-tinyint': 45, 'count-distinct-tinyint': 9, 'min-distinct-tinyint': 1,
|
|
'max-distinct-tinyint': 9, 'avg-distinct-tinyint': 5, 'ndv-distinct-tinyint': 9,
|
|
'sum-distinct-smallint': 4950, 'count-distinct-smallint': 99,
|
|
'min-distinct-smallint': 1, 'max-distinct-smallint': 99,
|
|
'avg-distinct-smallint': 50, 'ndv-distinct-smallint': 99,
|
|
'sum-distinct-int': 499500, 'count-distinct-int': 999, 'min-distinct-int': 1,
|
|
'max-distinct-int': 999, 'avg-distinct-int': 500, 'ndv-distinct-int': 999,
|
|
'sum-distinct-bigint': 4995000, 'count-distinct-bigint': 999, 'min-distinct-bigint': 10,
|
|
'max-distinct-bigint': 9990, 'avg-distinct-bigint': 5000,
|
|
'ndv-distinct-bigint': 999,
|
|
'sum-distinct-bool': 1, 'count-distinct-bool': 2, 'min-distinct-bool': 'false',
|
|
'max-distinct-bool': 'true', 'avg-distinct-bool': 0.5, 'ndv-distinct-bool': 2,
|
|
'sum-distinct-double': 5044950.0, 'count-distinct-double': 999,
|
|
'min-distinct-double': 10.1, 'max-distinct-double': 10089.9,
|
|
'avg-distinct-double': 5050.0, 'ndv-distinct-double': 999,
|
|
'sum-distinct-float': 549450.0, 'count-distinct-float': 999, 'min-distinct-float': 1.1,
|
|
'max-distinct-float': 1098.9, 'avg-distinct-float': 550.0,
|
|
'ndv-distinct-float': 999,
|
|
'count-distinct-timestamp': 10000, 'min-distinct-timestamp': '2010-01-01 00:00:00',
|
|
'max-distinct-timestamp': '2010-01-10 18:02:05.100000000',
|
|
'avg-distinct-timestamp': '2010-01-05 20:47:11.705080000',
|
|
'ndv-distinct-timestamp': 10000,
|
|
'count-distinct-string': 1000, 'min-distinct-string': '0',
|
|
'max-distinct-string': '999', 'ndv-distinct-string': 999,
|
|
}
|
|
|
|
class TestAggregation(ImpalaTestSuite):
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestAggregation, cls).add_test_dimensions()
|
|
|
|
# Add two more dimensions
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('agg_func', *AGG_FUNCTIONS))
|
|
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('data_type', *DATA_TYPES))
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v: cls.is_valid_vector(v))
|
|
|
|
@classmethod
|
|
def is_valid_vector(cls, vector):
|
|
data_type, agg_func = vector.get_value('data_type'), vector.get_value('agg_func')
|
|
file_format = vector.get_value('table_format').file_format
|
|
if file_format not in ['parquet']: return False
|
|
|
|
if cls.exploration_strategy() == 'core':
|
|
# Reduce execution time when exploration strategy is 'core'
|
|
if vector.get_value('exec_option')['batch_size'] != 0: return False
|
|
|
|
# Avro doesn't have timestamp type
|
|
non_numeric = data_type in ['bool', 'string']
|
|
if file_format == 'avro' and data_type == 'timestamp':
|
|
return False
|
|
elif non_numeric and agg_func not in ['min', 'max', 'count', 'ndv']:
|
|
return False
|
|
elif agg_func == 'sum' and data_type == 'timestamp':
|
|
return False
|
|
return True
|
|
|
|
def test_aggregation(self, vector):
|
|
exec_option = vector.get_value('exec_option')
|
|
disable_codegen = exec_option['disable_codegen']
|
|
data_type, agg_func = (vector.get_value('data_type'), vector.get_value('agg_func'))
|
|
|
|
query = 'select %s(%s_col) from alltypesagg where day is not null' % (agg_func,
|
|
data_type)
|
|
result = self.execute_query(query, exec_option,
|
|
table_format=vector.get_value('table_format'))
|
|
assert len(result.data) == 1
|
|
self.verify_agg_result(agg_func, data_type, False, result.data[0]);
|
|
|
|
if not disable_codegen:
|
|
# Verify codegen was enabled for the preaggregation.
|
|
# It is deliberately disabled for the merge aggregation.
|
|
assert_codegen_enabled(result.runtime_profile, [1])
|
|
|
|
query = 'select %s(DISTINCT(%s_col)) from alltypesagg where day is not null' % (
|
|
agg_func, data_type)
|
|
result = self.execute_query(query, vector.get_value('exec_option'))
|
|
assert len(result.data) == 1
|
|
self.verify_agg_result(agg_func, data_type, True, result.data[0]);
|
|
|
|
if not disable_codegen:
|
|
# Verify codegen was enabled for all stages of the aggregation.
|
|
assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
|
|
|
|
def verify_agg_result(self, agg_func, data_type, distinct, actual_string):
|
|
key = '%s-%s%s' % (agg_func, 'distinct-' if distinct else '', data_type)
|
|
|
|
if agg_func == 'ndv':
|
|
# NDV is inherently approximate. Compare with some tolerance.
|
|
err = abs(result_lut[key] - int(actual_string))
|
|
rel_err = err / float(result_lut[key])
|
|
print key, result_lut[key], actual_string,abs(result_lut[key] - int(actual_string))
|
|
assert err <= 1 or rel_err < 0.05
|
|
elif data_type in ('float', 'double') and agg_func != 'count':
|
|
# Compare with a margin of error.
|
|
delta = 1e6 if data_type == 'double' else 1e3
|
|
assert abs(result_lut[key] - float(actual_string)) < delta
|
|
elif data_type == 'timestamp' and agg_func != 'count':
|
|
# Strip off everything past 10s of microseconds.
|
|
ignore_digits = 4
|
|
assert result_lut[key][:-ignore_digits] == actual_string[:-ignore_digits]
|
|
else:
|
|
assert str(result_lut[key]) == actual_string
|
|
|
|
|
|
class TestAggregationQueries(ImpalaTestSuite):
|
|
"""Run the aggregation test suite, with codegen enabled and disabled, to exercise our
|
|
non-codegen code"""
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestAggregationQueries, cls).add_test_dimensions()
|
|
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
create_exec_option_dimension(disable_codegen_options=[False, True]))
|
|
|
|
if cls.exploration_strategy() == 'core':
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
create_uncompressed_text_dimension(cls.get_workload()))
|
|
|
|
def test_non_codegen_tinyint_grouping(self, vector, unique_database):
|
|
# Regression for IMPALA-901. The test includes an INSERT statement, so can only be run
|
|
# on INSERT-able formats - text only in this case, since the bug doesn't depend on the
|
|
# file format.
|
|
if vector.get_value('table_format').file_format == 'text' \
|
|
and vector.get_value('table_format').compression_codec == 'none':
|
|
self.client.execute("create table %s.imp_901 (col tinyint)" % unique_database)
|
|
self.run_test_case('QueryTest/aggregation_no_codegen_only', vector,
|
|
unique_database)
|
|
|
|
def test_aggregation(self, vector):
|
|
if vector.get_value('table_format').file_format == 'hbase':
|
|
pytest.xfail(reason="IMPALA-283 - select count(*) produces inconsistent results")
|
|
self.run_test_case('QueryTest/aggregation', vector)
|
|
|
|
def test_group_concat(self, vector):
|
|
"""group_concat distinct tests
|
|
Required to run directly in python because the order in which results will be
|
|
merged at the final, single-node aggregation step is non-deterministic (if the
|
|
first phase is running on multiple nodes). Need to pull the result apart and
|
|
compare the actual items)"""
|
|
exec_option = vector.get_value('exec_option')
|
|
disable_codegen = exec_option['disable_codegen']
|
|
table_format = vector.get_value('table_format')
|
|
# Test group_concat distinct with other aggregate function and groupings.
|
|
# expected result is the row: 2010,'1, 2, 3, 4','1-2-3-4','2|3|1|4',40,4
|
|
query = """select year, group_concat(distinct string_col),
|
|
group_concat(distinct string_col, '-'), group_concat(distinct string_col, '|'),
|
|
count(string_col), count(distinct string_col)
|
|
from alltypesagg where int_col < 5 and year = 2010 group by year"""
|
|
result = self.execute_query(query, exec_option, table_format=table_format)
|
|
row = (result.data)[0].split("\t")
|
|
assert(len(row) == 6)
|
|
assert(row[0] == '2010')
|
|
delimiter = [', ', '-', '|']
|
|
for i in range(1, 4):
|
|
assert(set(row[i].split(delimiter[i-1])) == set(['1', '2', '3', '4']))
|
|
assert(row[4] == '40')
|
|
assert(row[5] == '4')
|
|
if not disable_codegen:
|
|
# Verify codegen was enabled for all three stages of the aggregation.
|
|
assert_codegen_enabled(result.runtime_profile, [1, 2, 4])
|
|
|
|
# Test group_concat distinct with arrow delimiter, with multiple rows
|
|
query = """select day, group_concat(distinct string_col, "->")
|
|
from (select * from alltypesagg where id % 100 = day order by id limit 99999) a
|
|
group by day order by day"""
|
|
result = self.execute_query(query, exec_option, table_format=table_format)
|
|
string_col = []
|
|
string_col.append(set(['1','101','201','301','401','501','601','701','801','901']))
|
|
string_col.append(set(['2','102','202','302','402','502','602','702','802','902']))
|
|
string_col.append(set(['3','103','203','303','403','503','603','703','803','903']))
|
|
string_col.append(set(['4','104','204','304','404','504','604','704','804','904']))
|
|
string_col.append(set(['5','105','205','305','405','505','605','705','805','905']))
|
|
string_col.append(set(['6','106','206','306','406','506','606','706','806','906']))
|
|
string_col.append(set(['7','107','207','307','407','507','607','707','807','907']))
|
|
string_col.append(set(['8','108','208','308','408','508','608','708','808','908']))
|
|
string_col.append(set(['9','109','209','309','409','509','609','709','809','909']))
|
|
string_col.append(set(['10','110','210','310','410','510','610','710','810','910']))
|
|
assert(len(result.data) == 10)
|
|
for i in range(10):
|
|
row = (result.data)[i].split("\t")
|
|
assert(len(row) == 2)
|
|
assert(row[0] == str(i+1))
|
|
assert(set(row[1].split("->")) == string_col[i])
|
|
|
|
# Test group_concat distinct with merge node
|
|
query = """select group_concat(distinct string_col, ' ') from alltypesagg
|
|
where int_col < 10"""
|
|
result = self.execute_query(query, exec_option, table_format=table_format)
|
|
assert(set((result.data)[0].split(" ")) == set(['1','2','3','4','5','6','7','8','9']))
|
|
if not disable_codegen:
|
|
# Verify codegen was enabled for all four stages of the aggregation.
|
|
assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
|
|
|
|
def test_parquet_count_star_optimization(self, vector, unique_database):
|
|
if (vector.get_value('table_format').file_format != 'text' or
|
|
vector.get_value('table_format').compression_codec != 'none'):
|
|
# No need to run this test on all file formats
|
|
pytest.skip()
|
|
self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
|
|
vector.get_value('exec_option')['batch_size'] = 1
|
|
self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
|
|
|
|
def test_sampled_ndv(self, vector, unique_database):
|
|
"""The SAMPLED_NDV() function is inherently non-deterministic and cannot be
|
|
reasonably made deterministic with existing options so we test it separately.
|
|
The goal of this test is to ensure that SAMPLED_NDV() works on all data types
|
|
and returns approximately sensible estimates. It is not the goal of this test
|
|
to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is expected
|
|
be inaccurate on small data sets like the ones we use in this test."""
|
|
if (vector.get_value('table_format').file_format != 'text' or
|
|
vector.get_value('table_format').compression_codec != 'none'):
|
|
# No need to run this test on all file formats
|
|
pytest.skip()
|
|
|
|
# NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and SAMPLED_NDV()
|
|
# are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() can do.
|
|
# Expectations: All columns except 'id' and 'timestmap_col' have low NDVs and are
|
|
# expected to be reasonably accurate with SAMPLED_NDV(). For the two high-NDV columns
|
|
# SAMPLED_NDV() is expected to have high variance and error.
|
|
ndv_stmt = """
|
|
select ndv(bool_col), ndv(tinyint_col),
|
|
ndv(smallint_col), ndv(int_col),
|
|
ndv(bigint_col), ndv(float_col),
|
|
ndv(double_col), ndv(string_col),
|
|
ndv(cast(double_col as decimal(5, 0))),
|
|
ndv(cast(double_col as decimal(10, 5))),
|
|
ndv(cast(double_col as decimal(20, 10))),
|
|
ndv(cast(double_col as decimal(38, 33))),
|
|
ndv(cast(string_col as varchar(20))),
|
|
ndv(cast(string_col as char(10))),
|
|
ndv(timestamp_col), ndv(id)
|
|
from functional_parquet.alltypesagg"""
|
|
ndv_result = self.execute_query(ndv_stmt)
|
|
ndv_vals = ndv_result.data[0].split('\t')
|
|
|
|
for sample_perc in [0.1, 0.2, 0.5, 1.0]:
|
|
sampled_ndv_stmt = """
|
|
select sampled_ndv(bool_col, {0}), sampled_ndv(tinyint_col, {0}),
|
|
sampled_ndv(smallint_col, {0}), sampled_ndv(int_col, {0}),
|
|
sampled_ndv(bigint_col, {0}), sampled_ndv(float_col, {0}),
|
|
sampled_ndv(double_col, {0}), sampled_ndv(string_col, {0}),
|
|
sampled_ndv(cast(double_col as decimal(5, 0)), {0}),
|
|
sampled_ndv(cast(double_col as decimal(10, 5)), {0}),
|
|
sampled_ndv(cast(double_col as decimal(20, 10)), {0}),
|
|
sampled_ndv(cast(double_col as decimal(38, 33)), {0}),
|
|
sampled_ndv(cast(string_col as varchar(20)), {0}),
|
|
sampled_ndv(cast(string_col as char(10)), {0}),
|
|
sampled_ndv(timestamp_col, {0}), sampled_ndv(id, {0})
|
|
from functional_parquet.alltypesagg""".format(sample_perc)
|
|
sampled_ndv_result = self.execute_query(sampled_ndv_stmt)
|
|
sampled_ndv_vals = sampled_ndv_result.data[0].split('\t')
|
|
|
|
assert len(sampled_ndv_vals) == len(ndv_vals)
|
|
# Low NDV columns. We expect a reasonaby accurate estimate regardless of the
|
|
# sampling percent.
|
|
for i in xrange(0, 14):
|
|
self.appx_equals(int(sampled_ndv_vals[i]), int(ndv_vals[i]), 0.1)
|
|
# High NDV columns. We expect the estimate to have high variance and error.
|
|
# Since we give NDV() and SAMPLED_NDV() the same input data, i.e., we are not
|
|
# actually sampling for SAMPLED_NDV(), we expect the result of SAMPLED_NDV() to
|
|
# be bigger than NDV() proportional to the sampling percent.
|
|
# For example, the column 'id' is a PK so we expect the result of SAMPLED_NDV()
|
|
# with a sampling percent of 0.1 to be approximately 10x of the NDV().
|
|
for i in xrange(14, 16):
|
|
self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0)
|
|
|
|
|
|
class TestDistinctAggregation(ImpalaTestSuite):
|
|
"""Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs
|
|
enabled and disabled."""
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestDistinctAggregation, cls).add_test_dimensions()
|
|
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
create_exec_option_dimension_from_dict({
|
|
'disable_codegen': [False, True],
|
|
'shuffle_distinct_exprs': [False, True]
|
|
}))
|
|
|
|
if cls.exploration_strategy() == 'core':
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: v.get_value('table_format').file_format == 'text' and
|
|
v.get_value('table_format').compression_codec == 'none')
|
|
|
|
def test_distinct(self, vector):
|
|
if vector.get_value('table_format').file_format == 'hbase':
|
|
pytest.xfail("HBase returns columns in alphabetical order for select distinct *, "
|
|
"making the result verication to fail.")
|
|
if vector.get_value('table_format').file_format == 'kudu':
|
|
pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed for kudu")
|
|
self.run_test_case('QueryTest/distinct', vector)
|
|
|
|
|
|
class TestWideAggregationQueries(ImpalaTestSuite):
|
|
"""Test that aggregations with many grouping columns work"""
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestWideAggregationQueries, cls).add_test_dimensions()
|
|
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
create_exec_option_dimension(disable_codegen_options=[False, True]))
|
|
|
|
# File format doesn't matter for this test.
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def test_many_grouping_columns(self, vector):
|
|
"""Test that an aggregate with many grouping columns works"""
|
|
table_format = vector.get_value('table_format')
|
|
exec_option = vector.get_value('exec_option')
|
|
query = "select distinct * from widetable_1000_cols"
|
|
|
|
# Ensure codegen is enabled.
|
|
result = self.execute_query(query, exec_option, table_format=table_format)
|
|
|
|
# All rows should be distinct.
|
|
expected_result = widetable.get_data(1000, 10, quote_strings=True)
|
|
|
|
types = parse_column_types(result.schema)
|
|
labels = parse_column_labels(result.schema)
|
|
expected = QueryTestResult(expected_result, types, labels, order_matters=False)
|
|
actual = QueryTestResult(parse_result_rows(result), types, labels,
|
|
order_matters=False)
|
|
assert expected == actual
|
|
|
|
|
|
class TestTPCHAggregationQueries(ImpalaTestSuite):
|
|
# Uses the TPC-H dataset in order to have larger aggregations.
|
|
|
|
@classmethod
|
|
def get_workload(cls):
|
|
return 'tpch'
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestTPCHAggregationQueries, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
|
v.get_value('table_format').file_format in ['parquet'])
|
|
|
|
def test_tpch_aggregations(self, vector):
|
|
self.run_test_case('tpch-aggregations', vector)
|
|
|
|
def test_tpch_passthrough_aggregations(self, vector):
|
|
self.run_test_case('tpch-passthrough-aggregations', vector)
|