# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.skip import SkipIfFS, SkipIfLocal from tests.util.event_processor_utils import EventProcessorUtils from tests.util.filesystem_utils import IS_ISILON, WAREHOUSE from tests.util.hdfs_util import ( HdfsConfig, get_webhdfs_client, get_webhdfs_client_from_conf) TEST_TBL = "insert_inherit_permission" @SkipIfFS.hdfs_acls @SkipIfLocal.hdfs_client class TestInsertBehaviourCustomCluster(CustomClusterTestSuite): @classmethod def setup_class(cls): super(TestInsertBehaviourCustomCluster, cls).setup_class() if pytest.config.option.namenode_http_address is None: hdfs_conf = HdfsConfig(pytest.config.option.minicluster_xml_conf) cls.hdfs_client = get_webhdfs_client_from_conf(hdfs_conf) else: host, port = pytest.config.option.namenode_http_address.split(":") cls.hdfs_client = get_webhdfs_client(host, port) def _check_partition_perms(self, part, perms): ls = self.hdfs_client.get_file_dir_status("test-warehouse/%s/%s" % (TEST_TBL, part)) assert ls['FileStatus']['permission'] == perms def _get_impala_client(self): impalad = self.cluster.get_any_impalad() return impalad.service.create_hs2_client() def _create_test_tbl(self): client = self._get_impala_client() options = {'sync_ddl': '1'} try: self.execute_query_expect_success(client, "DROP TABLE IF EXISTS %s" % TEST_TBL, query_options=options) self.execute_query_expect_success(client, "CREATE TABLE {0} (col int) PARTITIONED" " BY (p1 int, p2 int, p3 int) location" " '{1}/{0}'".format(TEST_TBL, WAREHOUSE), query_options=options) self.execute_query_expect_success(client, "ALTER TABLE %s" " ADD PARTITION(p1=1, p2=1, p3=1)" % TEST_TBL, query_options=options) finally: client.close() def _drop_test_tbl(self): client = self._get_impala_client() self.execute_query_expect_success(client, "drop table if exists %s" % TEST_TBL) client.close() def setup_method(cls, method): super(TestInsertBehaviourCustomCluster, cls).setup_method(method) cls._create_test_tbl() def teardown_method(cls, method): cls._drop_test_tbl() super(TestInsertBehaviourCustomCluster, cls).teardown_method(method) @SkipIfLocal.root_path @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--insert_inherit_permissions=true") def test_insert_inherit_permission(self): """Create a table with three partition columns to test permission inheritance""" client = self._get_impala_client() try: self.hdfs_client.chmod("test-warehouse/%s/p1=1/" % TEST_TBL, "777") # 1. INSERT that creates two new directories gets permissions from parent self.execute_query_expect_success(client, "INSERT INTO %s" " PARTITION(p1=1, p2=2, p3=2) VALUES(1)" % TEST_TBL) self._check_partition_perms("p1=1/p2=2/", "777") self._check_partition_perms("p1=1/p2=2/p3=2/", "777") # 2. INSERT that creates one new directory gets permissions from parent self.execute_query_expect_success(client, "INSERT INTO %s" " PARTITION(p1=1, p2=2, p3=3) VALUES(1)" % TEST_TBL) self._check_partition_perms("p1=1/p2=2/p3=3/", "777") # 3. INSERT that creates no new directories keeps standard permissions self.hdfs_client.chmod("test-warehouse/%s/p1=1/p2=2" % TEST_TBL, "744") self.execute_query_expect_success(client, "INSERT INTO %s" " PARTITION(p1=1, p2=2, p3=3) VALUES(1)" % TEST_TBL) self._check_partition_perms("p1=1/p2=2/", "744") self._check_partition_perms("p1=1/p2=2/p3=3/", "777") finally: client.close() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args("--insert_inherit_permissions=false") def test_insert_inherit_permission_disabled(self): """Check that turning off insert permission inheritance works correctly.""" impalad = self.cluster.get_any_impalad() client = impalad.service.create_hs2_client() try: ls = self.hdfs_client.get_file_dir_status("test-warehouse/%s/p1=1/" % TEST_TBL) default_perms = ls['FileStatus']['permission'] self.hdfs_client.chmod("test-warehouse/%s/p1=1/" % TEST_TBL, "777") self.execute_query_expect_success(client, "INSERT INTO %s" " PARTITION(p1=1, p2=3, p3=4) VALUES(1)" % TEST_TBL) # Would be 777 if inheritance was enabled if not IS_ISILON: # IMPALA-4221 self._check_partition_perms("p1=1/p2=3/", default_perms) self._check_partition_perms("p1=1/p2=3/p3=4/", default_perms) finally: client.close() @SkipIfFS.hive class TestInsertUnSyncedPartition(CustomClusterTestSuite): @classmethod def setup_class(cls): super(TestInsertUnSyncedPartition, cls).setup_class() @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=0") def test_insert_unsynced_partition(self, unique_database): """Regression test for IMPALA-12257. Tests with event-processing disabled so catalogd can easily have unsynced partition with HMS.""" self._test_insert_on_unsynced_partition(unique_database, "part1", False, False) self._test_insert_on_unsynced_partition(unique_database, "part2", False, True) self._test_insert_on_unsynced_partition(unique_database, "txn_part1", True, False) self._test_insert_on_unsynced_partition(unique_database, "txn_part2", True, True) def _test_insert_on_unsynced_partition(self, db, tbl, is_transactional, is_overwrite): tbl_name = db + "." + tbl create_stmt = """ create table {0} (i int) partitioned by (p int) stored as textfile""".format(tbl_name) if is_transactional: create_stmt += """ tblproperties( 'transactional'='true', 'transactional_properties'='insert_only')""" self.client.execute(create_stmt) # Run any query on the table to make it loaded in catalogd. self.client.execute("describe " + tbl_name) # Add the partition in Hive so catalogd is not aware of it. self.run_stmt_in_hive(""" insert into {0} partition (p=0) values (0)""".format(tbl_name)) # Track the last event id so we can fetch the generated events last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) # Insert the new partition in Impala. self.client.execute(""" insert {0} {1} partition(p=0) values (1) """.format("overwrite" if is_overwrite else "into", tbl_name)) events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id) if is_transactional: assert len(events) > 2 assert events[0].eventType == "OPEN_TXN" assert events[1].eventType == "ALLOC_WRITE_ID_EVENT" assert events[1].dbName == db assert events[1].tableName == tbl # There is an empty ADD_PARTITION event due to Impala invokes the add_partitions # HMS API but no new partitions are really added. This might change in future Hive # versions. Here we just verify whether the last event is COMMIT_TXN. assert events[len(events) - 1].eventType == "COMMIT_TXN" else: assert len(events) > 0 last_event = events[len(events) - 1] assert last_event.dbName == db assert last_event.tableName == tbl assert last_event.eventType == "INSERT" res = self.client.execute("select * from " + tbl_name) if is_overwrite: assert res.data == ["1\t0"] else: assert "0\t0" in res.data assert "1\t0" in res.data assert len(res.data) == 2