# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite class TestRefreshInvalidPartition(CustomClusterTestSuite): @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( catalogd_args="--topic_update_log_gc_frequency=10") def test_refresh_invalid_partition_with_sync_ddl(self, unique_database): """ Regression test for IMPALA-12448. Avoid getting stuck when refreshing a non-existent partition with sync_ddl. """ table_name_1 = unique_database + '.' + "partition_test_table_1" table_name_2 = unique_database + '.' + "partition_test_table_2" self.client.execute( 'create table %s (y int) partitioned by (x int)' % table_name_1) self.client.execute( 'create table %s (y int) partitioned by (x int)' % table_name_2) self.client.execute('insert into table %s partition (x=1) values(1)' % table_name_1) self.client.execute('insert into table %s partition (x=1) values(1)' % table_name_2) assert '1\t1' == self.client.execute( 'select * from %s' % table_name_1).get_data() assert '1\t1' == self.client.execute( 'select * from %s' % table_name_2).get_data() """ Run it multiple times so that at least one topic update log GC is triggered. """ i = 15 while i > 0: self.execute_query('refresh %s' % table_name_1, query_options={"SYNC_DDL": "true"}) i -= 1 """Refresh a non-existent partition with sync_ddl.""" self.execute_query_expect_success(self.client, 'refresh %s partition (x=999)' % table_name_2, query_options={"SYNC_DDL": "true", "EXEC_TIME_LIMIT_S": "30"}) @CustomClusterTestSuite.with_args( statestored_args="--statestore_update_frequency_ms=5000") def test_refresh_missing_partition(self, unique_database): client1 = self.cluster.impalads[1].service.create_hs2_client() client2 = self.cluster.impalads[2].service.create_hs2_client() self.client.execute('create table {}.tbl (i int) partitioned by (p int)' .format(unique_database)) self.execute_query( 'insert into {}.tbl partition(p) values (0,0), (1,1)'.format(unique_database), query_options={"SYNC_DDL": "true"}) self.execute_query_expect_success( self.client, 'alter table {}.tbl drop partition(p=0)'.format(unique_database), {"SYNC_DDL": "false"}) self.execute_query_expect_success( client1, 'refresh {}.tbl partition(p=0)'.format(unique_database), {"SYNC_DDL": "true"}) show_parts_stmt = 'show partitions {}.tbl'.format(unique_database) res = self.execute_query_expect_success(client2, show_parts_stmt) # First line is the header. Only one partition should be shown so the # result has two lines. assert len(res.data) == 2 res = self.execute_query_expect_success(client1, show_parts_stmt) assert len(res.data) == 2 res = self.execute_query_expect_success(self.client, show_parts_stmt) assert len(res.data) == 2