mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This patch extends the SHOW PARTITIONS statement to allow an optional WHERE clause that filters partitions based on partition column values. The implementation adds support for various comparison operators, IN lists, BETWEEN clauses, IS NULL, and logical AND/OR expressions involving partition columns. Non-partition columns, subqueries, and analytic expressions in the WHERE clause are not allowed and will result in an analysis error. New analyzer tests have been added to AnalyzeDDLTest#TestShowPartitions to verify correct parsing, semantic validation, and error handling for supported and unsupported cases. Testing: - Added new unit tests in AnalyzeDDLTest for valid and invalid WHERE clause cases. - Verified functional tests covering partition filtering behavior. Change-Id: I2e2a14aabcea3fb17083d4ad6f87b7861113f89e Reviewed-on: http://gerrit.cloudera.org:8080/23566 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
247 lines
11 KiB
Python
247 lines
11 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Impala tests for queries that query metadata and set session settings
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import pytest
|
|
import re
|
|
import datetime
|
|
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.skip import SkipIfFS, SkipIfCatalogV2
|
|
from tests.common.test_dimensions import ALL_NODES_ONLY
|
|
from tests.common.test_dimensions import create_exec_option_dimension
|
|
from tests.common.test_dimensions import create_uncompressed_text_dimension
|
|
from tests.util.filesystem_utils import get_fs_path
|
|
from tests.util.event_processor_utils import EventProcessorUtils
|
|
|
|
# TODO: For these tests to pass, all table metadata must be created exhaustively.
|
|
# the tests should be modified to remove that requirement.
|
|
class TestMetadataQueryStatements(ImpalaTestSuite):
|
|
|
|
CREATE_DATA_SRC_STMT = ("CREATE DATA SOURCE %s LOCATION '" +
|
|
get_fs_path("/test-warehouse/data-sources/test-data-source.jar") +
|
|
"' CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' API_VERSION 'V1'")
|
|
DROP_DATA_SRC_STMT = "DROP DATA SOURCE IF EXISTS %s"
|
|
TEST_DATA_SRC_NAMES = ["show_test_ds1", "show_test_ds2"]
|
|
AVRO_SCHEMA_LOC = get_fs_path("/test-warehouse/avro_schemas/functional/alltypes.json")
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestMetadataQueryStatements, cls).add_test_dimensions()
|
|
sync_ddl_opts = [0, 1]
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
# Cut down on test runtime by only running with SYNC_DDL=0
|
|
sync_ddl_opts = [0]
|
|
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
cluster_sizes=ALL_NODES_ONLY,
|
|
disable_codegen_options=[False],
|
|
batch_sizes=[0],
|
|
sync_ddl=sync_ddl_opts))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
create_uncompressed_text_dimension(cls.get_workload()))
|
|
|
|
def test_use(self, vector):
|
|
self.run_test_case('QueryTest/use', vector)
|
|
|
|
def test_show(self, vector):
|
|
self.run_test_case('QueryTest/show', vector)
|
|
|
|
@SkipIfFS.incorrent_reported_ec
|
|
def test_show_stats(self, vector):
|
|
self.run_test_case('QueryTest/show-stats', vector, "functional")
|
|
|
|
def test_show_partitions_with_nondeterministic_functions(self):
|
|
"""Test SHOW PARTITIONS WHERE with non-deterministic functions
|
|
like rand() and now().
|
|
"""
|
|
|
|
# Test rand() - just verify the statement succeeds without errors
|
|
result = self.execute_query("show partitions functional.alltypes where rand() < 0.5")
|
|
assert result.success, "SHOW PARTITIONS with rand() should succeed"
|
|
# Verify we got some partitions back (rand() typically returns ~0.47 without seed)
|
|
assert len(result.data) > 0, "SHOW PARTITIONS with rand() should return some \
|
|
partitions"
|
|
|
|
# Test now() - verify the statement succeeds and returns partitions for current month
|
|
current_month = datetime.datetime.now().month
|
|
result = self.execute_query(
|
|
"show partitions functional.alltypes where month = month(now())")
|
|
assert result.success, "SHOW PARTITIONS with now() should succeed"
|
|
|
|
# Verify we got exactly the partitions for the current month
|
|
# alltypes has 2 years (2009, 2010) with all 12 months
|
|
# So we should get 2 partitions (one per year) for the current month
|
|
partition_months = []
|
|
for row in result.data:
|
|
# Skip the 'Total' row
|
|
if 'Total' not in row:
|
|
parts = row.split('\t')
|
|
if len(parts) >= 2:
|
|
partition_months.append(int(parts[1]))
|
|
|
|
# All returned partitions should be for the current month
|
|
for month in partition_months:
|
|
assert month == current_month, \
|
|
"Expected month {0}, got {1}".format(current_month, month)
|
|
|
|
# We should have 2 partitions (year=2009/month=N and year=2010/month=N)
|
|
assert len(partition_months) == 2, \
|
|
"Expected 2 partitions for current month, got {0}".format(len(partition_months))
|
|
|
|
def test_describe_path(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/describe-path', vector, unique_database)
|
|
|
|
# Missing Coverage: Describe formatted compatibility between Impala and Hive when the
|
|
# data doesn't reside in hdfs.
|
|
@SkipIfFS.hive
|
|
def test_describe_formatted(self, vector, unique_database):
|
|
# IMPALA-10176: test_describe_formatted is broken, so disable it for now
|
|
pytest.skip()
|
|
# For describe formmated, we try to match Hive's output as closely as possible.
|
|
# However, we're inconsistent with our handling of NULLs vs theirs - Impala sometimes
|
|
# specifies 'NULL' where Hive uses an empty string, and Hive somtimes specifies 'null'
|
|
# with padding where Impala uses a sequence of blank spaces - and for now
|
|
# we want to leave it that way to not affect users who rely on this output.
|
|
def compare_describe_formatted(impala_results, hive_results):
|
|
for impala, hive in zip(re.split(',|\n', impala_results),
|
|
re.split(',|\n', hive_results)):
|
|
|
|
if impala != hive:
|
|
# If they don't match, check if it's because of the inconsistent null handling.
|
|
impala = impala.replace(' ', '').lower()
|
|
hive = hive.replace(' ', '').lower()
|
|
if not ((impala == "'null'" and hive == "''") or
|
|
(impala == "''" and hive == "'null'")):
|
|
return False
|
|
return True
|
|
|
|
# Describe a partitioned table.
|
|
self.exec_and_compare_hive_and_impala_hs2("describe formatted functional.alltypes",
|
|
compare=compare_describe_formatted)
|
|
self.exec_and_compare_hive_and_impala_hs2(
|
|
"describe formatted functional_text_gzip.alltypes",
|
|
compare=compare_describe_formatted)
|
|
|
|
# Describe an unpartitioned table.
|
|
self.exec_and_compare_hive_and_impala_hs2("describe formatted tpch.lineitem",
|
|
compare=compare_describe_formatted)
|
|
self.exec_and_compare_hive_and_impala_hs2("describe formatted functional.jointbl",
|
|
compare=compare_describe_formatted)
|
|
|
|
# Create and describe an unpartitioned and partitioned Avro table created
|
|
# by Impala without any column definitions.
|
|
# TODO: Instead of creating new tables here, change one of the existing
|
|
# Avro tables to be created without any column definitions.
|
|
self.client.execute("create database if not exists %s" % unique_database)
|
|
self.client.execute((
|
|
"create table %s.%s with serdeproperties ('avro.schema.url'='%s') stored as avro"
|
|
% (unique_database, "avro_alltypes_nopart", self.AVRO_SCHEMA_LOC)))
|
|
self.exec_and_compare_hive_and_impala_hs2("describe formatted avro_alltypes_nopart",
|
|
compare=compare_describe_formatted)
|
|
|
|
self.client.execute((
|
|
"create table %s.%s partitioned by (year int, month int) "
|
|
"with serdeproperties ('avro.schema.url'='%s') stored as avro"
|
|
% (unique_database, "avro_alltypes_part", self.AVRO_SCHEMA_LOC)))
|
|
self.exec_and_compare_hive_and_impala_hs2("describe formatted avro_alltypes_part",
|
|
compare=compare_describe_formatted)
|
|
|
|
self.exec_and_compare_hive_and_impala_hs2(\
|
|
"describe formatted functional.alltypes_view_sub",
|
|
compare=compare_describe_formatted)
|
|
|
|
# test for primary / foreign constraints
|
|
self.exec_and_compare_hive_and_impala_hs2(\
|
|
"describe formatted functional.child_table",
|
|
compare=compare_describe_formatted)
|
|
|
|
self.exec_and_compare_hive_and_impala_hs2(\
|
|
"describe formatted functional.parent_table_2",
|
|
compare=compare_describe_formatted)
|
|
|
|
self.exec_and_compare_hive_and_impala_hs2(\
|
|
"describe formatted tpcds.store_returns",
|
|
compare=compare_describe_formatted)
|
|
|
|
@pytest.mark.execute_serially # due to data src setup/teardown
|
|
def test_show_data_sources(self, vector):
|
|
try:
|
|
self.__create_data_sources()
|
|
self.run_test_case('QueryTest/show-data-sources', vector)
|
|
finally:
|
|
self.__drop_data_sources()
|
|
|
|
def __drop_data_sources(self):
|
|
for name in self.TEST_DATA_SRC_NAMES:
|
|
self.client.execute(self.DROP_DATA_SRC_STMT % (name,))
|
|
|
|
def __create_data_sources(self):
|
|
self.__drop_data_sources()
|
|
for name in self.TEST_DATA_SRC_NAMES:
|
|
self.client.execute(self.CREATE_DATA_SRC_STMT % (name,))
|
|
|
|
@SkipIfFS.hive
|
|
@pytest.mark.execute_serially # because of use of hardcoded database
|
|
def test_describe_db(self, vector, cluster_properties):
|
|
self.__test_describe_db_cleanup()
|
|
try:
|
|
# Some versions of HMS will fail to create the database if the managed directory
|
|
# is already present. This is not a test for HMS, so this uses unique directory
|
|
# names to workaround the issue.
|
|
self.client.execute("create database impala_test_desc_db1")
|
|
self.client.execute("create database impala_test_desc_db2 "
|
|
"comment 'test comment'")
|
|
self.client.execute("create database impala_test_desc_db3 "
|
|
"location '" + get_fs_path("/testdb3") + "'")
|
|
self.client.execute("create database impala_test_desc_db4 comment 'test comment' "
|
|
"location \"" + get_fs_path("/test4.db") + "\"")
|
|
self.client.execute("create database impala_test_desc_db5 comment 'test comment' "
|
|
"managedlocation \"" + get_fs_path("/test5.db") + "\"")
|
|
self.run_stmt_in_hive("create database hive_test_desc_db comment 'test comment' "
|
|
"with dbproperties('pi' = '3.14', 'e' = '2.82')")
|
|
self.run_stmt_in_hive("create database hive_test_desc_db2 comment 'test comment' "
|
|
"managedlocation '" + get_fs_path("/test2.db") + "'")
|
|
if cluster_properties.is_event_polling_enabled():
|
|
# Using HMS event processor - wait until the database shows up.
|
|
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
|
self.client.set_configuration({
|
|
"sync_hms_events_wait_time_s": 10,
|
|
"sync_hms_events_strict_mode": True
|
|
})
|
|
# Waits for the externally created dbs to appear.
|
|
self.confirm_db_exists("hive_test_desc_db")
|
|
self.confirm_db_exists("hive_test_desc_db2")
|
|
self.client.clear_configuration()
|
|
else:
|
|
# Invalidate metadata to pick up hive-created db.
|
|
self.client.execute("invalidate metadata")
|
|
self.run_test_case('QueryTest/describe-db', vector)
|
|
finally:
|
|
self.__test_describe_db_cleanup()
|
|
|
|
def __test_describe_db_cleanup(self):
|
|
self.cleanup_db('hive_test_desc_db', sync_ddl=0)
|
|
self.cleanup_db('hive_test_desc_db2', sync_ddl=0)
|
|
self.cleanup_db('impala_test_desc_db1', sync_ddl=0)
|
|
self.cleanup_db('impala_test_desc_db2', sync_ddl=0)
|
|
self.cleanup_db('impala_test_desc_db3', sync_ddl=0)
|
|
self.cleanup_db('impala_test_desc_db4', sync_ddl=0)
|
|
self.cleanup_db('impala_test_desc_db5', sync_ddl=0)
|