# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # Client tests for SQL statement authorization from __future__ import absolute_import, division, print_function from getpass import getuser import random import pytest from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.file_utils import assert_file_in_dir_contains from tests.common.test_result_verifier import error_msg_equal from tests.common.test_vector import HS2 from tests.custom_cluster.test_local_catalog import TestLocalCatalogRetries PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select'] ADMIN = "admin" class TestAuthorization(CustomClusterTestSuite): @classmethod def default_test_protocol(cls): return HS2 @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( "--server_name=server1 --authorization_policy_file=ignored_file", impala_log_dir="{deprecated_flags}", tmp_dir_placeholders=['deprecated_flags'], disable_log_buffering=True) def test_deprecated_flags(self): assert_file_in_dir_contains(self.impala_log_dir, "Ignoring removed flag " "authorization_policy_file") @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger " "--min_privilege_set_for_show_stmts=select", catalogd_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger") def test_ranger_show_iceberg_metadata_tables(self, unique_name): unique_db = unique_name + "_db" tbl_name = "ice" full_tbl_name = "{}.{}".format(unique_db, tbl_name) non_existent_suffix = "_a" non_existent_tbl_name = full_tbl_name + non_existent_suffix priv = "select" user = getuser() query = "show metadata tables in {}".format(full_tbl_name) query_non_existent = "show metadata tables in {}".format(non_existent_tbl_name) admin_client = self.create_impala_client(user=ADMIN) user_client = self.create_impala_client(user=user) try: admin_client.execute("drop database if exists {} cascade".format(unique_db)) admin_client.execute("create database {}".format(unique_db)) admin_client.execute( "create table {} (i int) stored as iceberg".format(full_tbl_name)) # Check that when the user has no privileges on the database, the error is the same # if we try an existing or a non-existing table. exc1_str = str(self.execute_query_expect_failure(user_client, query)) exc2_str = str(self.execute_query_expect_failure(user_client, query_non_existent)) assert error_msg_equal(exc1_str, exc2_str) assert "AuthorizationException" in exc1_str assert "does not have privileges to access" # Check that there is no error when the user has access to the table. admin_client.execute("grant {priv} on database {db} to user {target_user}".format( priv=priv, db=unique_db, target_user=user)) self.execute_query_expect_success(user_client, query) finally: admin_client.execute( "revoke {priv} on database {db} from user {target_user}".format( priv=priv, db=unique_db, target_user=user)) admin_client.execute("drop database if exists {} cascade".format(unique_db)) @staticmethod def _verify_show_dbs(result, unique_name, visibility_privileges=PRIVILEGES): """ Helper function for verifying the results of SHOW DATABASES below. Only show databases with privileges implying any of the visibility_privileges. """ for priv in PRIVILEGES: # Result lines are in the format of "db_name\tdb_comment" db_name = 'db_%s_%s\t' % (unique_name, priv) if priv != 'all' and priv not in visibility_privileges: assert db_name not in result.data else: assert db_name in result.data def _test_ranger_show_stmts_helper(self, unique_name, visibility_privileges): unique_db = unique_name + "_db" admin_client = self.create_impala_client(user=ADMIN) try: admin_client.execute("drop database if exists %s cascade" % unique_db) admin_client.execute("create database %s" % unique_db) for priv in PRIVILEGES: admin_client.execute("create database db_%s_%s" % (unique_name, priv)) admin_client.execute("grant {0} on database db_{1}_{2} to user {3}" .format(priv, unique_name, priv, getuser())) admin_client.execute("create table %s.tbl_%s (i int)" % (unique_db, priv)) admin_client.execute("grant {0} on table {1}.tbl_{2} to user {3}" .format(priv, unique_db, priv, getuser())) # Admin can still see all the databases and tables result = admin_client.execute("show databases") TestAuthorization._verify_show_dbs(result, unique_name) result = admin_client.execute("show tables in %s" % unique_db) assert result.data == ["tbl_%s" % p for p in PRIVILEGES] # Check SHOW DATABASES and SHOW TABLES using another username result = self.client.execute("show databases") TestAuthorization._verify_show_dbs(result, unique_name, visibility_privileges) result = self.client.execute("show tables in %s" % unique_db) # Only show tables with privileges implying any of the visibility privileges assert 'tbl_all' in result.data # ALL can imply to any privilege for p in visibility_privileges: assert 'tbl_%s' % p in result.data finally: admin_client.execute("drop database if exists %s cascade" % unique_db) for priv in PRIVILEGES: admin_client.execute( "drop database if exists db_%s_%s cascade" % (unique_name, priv)) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger " "--min_privilege_set_for_show_stmts=select", catalogd_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger") def test_ranger_show_stmts_with_select(self, unique_name): self._test_ranger_show_stmts_helper(unique_name, ['select']) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger " "--min_privilege_set_for_show_stmts=select,insert", catalogd_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger") def test_ranger_show_stmts_with_select_insert(self, unique_name): self._test_ranger_show_stmts_helper(unique_name, ['select', 'insert']) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger " "--min_privilege_set_for_show_stmts=any", catalogd_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger") def test_ranger_show_stmts_with_any(self, unique_name): self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger " "--num_check_authorization_threads=%d" % (random.randint(2, 128)), catalogd_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger") def test_num_check_authorization_threads_with_ranger(self, unique_name): self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES) @CustomClusterTestSuite.with_args( impalad_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger " "--use_local_catalog=true", catalogd_args="--server-name=server1 --ranger_service_type=hive " "--ranger_app_id=impala --authorization_provider=ranger " "--catalog_topic_mode=minimal") def test_local_catalog_show_dbs_with_transient_db(self, unique_name): """Regression test for IMPALA-13170""" # Starts a background thread to create+drop the transient db. # Use admin user to have create+drop privileges. unique_database = unique_name + "_db" admin_client = self.create_impala_client(user=ADMIN) TestLocalCatalogRetries._run_show_dbs_with_transient_db( unique_database, admin_client, self.client)