Files
impala/tests/metadata/test_refresh_partition.py
stiga-huang b37f4509fa IMPALA-14089: Support REFRESH on multiple partitions
Currently we just support REFRESH on the whole table or a specific
partition:
  REFRESH [db_name.]table_name [PARTITION (key_col1=val1 [, key_col2=val2...])]

If users want to refresh multiple partitions, they have to submit
multiple statements each for a single partition. This has some
drawbacks:
 - It requires holding the table write lock inside catalogd multiple
   times, which increase lock contention with other read/write
   operations on the same table, e.g. getPartialCatalogObject requests
   from coordinators.
 - Catalog version of the table will be increased multiple times.
   Coordinators in local catalog mode is more likely to see different
   versions between their getPartialCatalogObject requests so have to
   retry planning to resolve InconsistentMetadataFetchException.
 - Partitions are reloaded in sequence. They should be reloaded in
   parallel like we do in refreshing the whole table.

This patch extends the syntax to refresh multiple partitions in one
statement:
  REFRESH [db_name.]table_name
  [PARTITION (key_col1=val1 [, key_col2=val2...])
   [PARTITION (key_col1=val3 [, key_col2=val4...])...]]
Example:
  REFRESH foo PARTITION(p=0) PARTITION(p=1) PARTITION(p=2);

TResetMetadataRequest is extended to have a list of partition specs for
this. If the list has only one item, we still use the existing logic of
reloading a specific partition. If the list has more than one item,
partitions will be reloaded in parallel. This is implemented in
CatalogServiceCatalog#reloadTable(). Previously it always invokes
HdfsTable#load() with partitionsToUpdate=null. Now the parameter is
set when TResetMetadataRequest has the partition list.

HMS notification events in RELOAD type will be fired for each partition
if enable_reload_events is turned on. Once HIVE-28967 is resolved, we
can fire a single event for multiple partitions.

Updated docs in impala_refresh.xml.

Tests:
 - Added FE and e2e tests

Change-Id: Ie5b0deeaf23129ed6e1ba2817f54291d7f63d04e
Reviewed-on: http://gerrit.cloudera.org:8080/22938
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-05-28 05:18:53 +00:00

230 lines
10 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
from subprocess import check_call
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.common.test_dimensions import create_uncompressed_text_dimension
from tests.common.skip import SkipIfFS
from tests.common.test_vector import HS2
from tests.util.filesystem_utils import get_fs_path
@SkipIfFS.hive
class TestRefreshPartition(ImpalaTestSuite):
"""
This class tests the functionality to refresh a partition individually
for a table in HDFS
"""
@classmethod
def default_test_protocol(cls):
return HS2
@classmethod
def add_test_dimensions(cls):
super(TestRefreshPartition, cls).add_test_dimensions()
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def test_refresh_invalid_partition(self, unique_database):
"""
Trying to refresh a partition that does not exist does not modify anything
either in impala or hive.
"""
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
self.client.execute('refresh %s partition (y=71, z=8857)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
self.client.execute(
'refresh %s partition (y=71, z=8857) partition (y=0, z=0)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
def test_remove_data_and_refresh(self, unique_database):
"""
Data removed through hive is visible in impala after refresh of partition.
"""
expected_error = 'Error(2): No such file or directory'
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
self.client.execute(
'insert into table %s partition (y=333, z=5309) values (2)' % table_name)
assert '2\t333\t5309' == self.client.execute(
'select * from %s' % table_name).get_data()
self.run_stmt_in_hive(
'alter table %s drop partition (y=333, z=5309)' % table_name)
# Query the table. With file handle caching, this may not produce an error,
# because the file handles are still open in the cache. If the system does
# produce an error, it should be the expected error.
try:
self.client.execute("select * from %s" % table_name)
except IMPALA_CONNECTION_EXCEPTION as e:
assert expected_error in str(e)
self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str('0')]
# Test multiple partitions
self.client.execute(
'insert into table %s partition (y, z) values '
'(2, 33, 444), (3, 44, 555), (4, 55, 666)' % table_name)
result = self.client.execute('select * from %s' % table_name)
assert '2\t33\t444' in result.data
assert '3\t44\t555' in result.data
assert '4\t55\t666' in result.data
assert len(result.data) == 3
# Drop two partitions in Hive
self.run_stmt_in_hive(
'alter table %s drop partition (y>33)' % table_name)
# Query the table. With file handle caching, this may not produce an error,
# because the file handles are still open in the cache. If the system does
# produce an error, it should be the expected error.
try:
self.client.execute("select * from %s" % table_name)
except IMPALA_CONNECTION_EXCEPTION as e:
assert expected_error in str(e)
self.client.execute(
'refresh %s partition (y=33, z=444) partition (y=44, z=555) '
'partition (y=55, z=666)' % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['1']
def test_add_delete_data_to_hdfs_and_refresh(self, unique_database):
"""
Data added/deleted directly in HDFS is visible in impala after refresh of
partition.
"""
table_name = unique_database + '.' + "partition_test_table"
table_location = get_fs_path("/test-warehouse/%s" % unique_database)
file_name = "alltypes.parq"
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
file_num_rows = 1000
self.client.execute("""
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
for month in range(1, 5):
self.client.execute("alter table %s add partition (year=2010, month=%d)" %
(table_name, month))
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = "%s/year=2010/month=1/%s" % (table_location, file_name)
self.filesystem_client.copy(src_file, dst_path, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Chech that data is visible after refresh
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows)]
# Check that after deleting the file and refreshing, it returns zero rows
check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Test multiple partitions
for month in range(2, 5):
dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name)
self.filesystem_client.copy(src_file, dst_path, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['0']
# Chech that data is visible after refresh
self.client.execute(
"refresh %s partition (year=2010, month=2) partition (year=2010, month=3) "
"partition (year=2010, month=4)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 3)]
# Check that after deleting the file and refreshing, it returns zero rows
for month in range(2, 5):
dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name)
check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
self.client.execute(
"refresh %s partition (year=2010, month=2) partition (year=2010, month=3) "
"partition (year=2010, month=4)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['0']
def test_confirm_individual_refresh(self, unique_database):
"""
Data added directly to HDFS is only visible for the partition refreshed
"""
table_name = unique_database + '.' + "partition_test_table"
table_location = get_fs_path("/test-warehouse/%s" % unique_database)
file_name = "alltypes.parq"
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
file_num_rows = 1000
self.client.execute("""
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
for month in range(1, 6):
self.client.execute("alter table %s add partition (year=2010, month=%s)" %
(table_name, month))
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = table_location + "/year=2010/month=%s/" + file_name
for month in range(1, 6):
self.filesystem_client.copy(src_file, dst_path % month, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Check that data is visible after refresh on the first partition only
self.client.execute("refresh %s partition (year=2010, month=1)" %
table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows)]
# Check that the data is not yet visible for the second partition
# that was not refreshed
result = self.client.execute(
"select count(*) from %s where year=2010 and month=2" % table_name)
assert result.data == [str(0)]
# Check that data is visible for the second partition after refresh
self.client.execute("refresh %s partition (year=2010, month=2)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 2)]
# Refresh multiple partitions
self.client.execute(
"refresh %s partition (year=2010, month=3) partition (year=2010, month=4) "
"partition (year=2010, month=5)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 5)]