Files
impala/tests/authorization/test_authorization.py
stiga-huang a41c5cbfdd IMPALA-14416: JniFrontend.getDbs() should handle InconsistentMetadataFetchException
JniFrontend.getDbs() returns the thrift representation of all the dbs.
This might trigger multiple getPartialCatalogObject requests to catalogd
and could fail in InconsistentMetadataFetchException, e.g. if a db is
removed after coordinator fetching the db name list and before
coordinator fetching the msDb of that db.

This patch fixes the issue by retrying the above steps when hitting
InconsistentMetadataFetchException, similar to what other methods in
Frontend do. Adds getThriftDbs() in Frontend to directly return the
thrift db list so JniFrontend can use it directly and the retry can be
added inside Frontend.java.

TestAuthorization.test_local_catalog_show_dbs_with_transient_db is an
existing test to verify a similar problem. Running this test with
authorization disabled can reproduce the current bug. So this patch
extracts the test code into
TestLocalCatalogRetries._run_show_dbs_with_transient_db() and share it
in both authz enabled and disabled tests.

Tests
 - Ran TestLocalCatalogRetries.test_show_dbs_retry 60 times. Without the
   fix, it fails in about a dozen times.

Change-Id: Ib337f88a2ac0f35142417f6cee51d30497f12845
Reviewed-on: http://gerrit.cloudera.org:8080/23402
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2025-09-10 10:51:07 +00:00

196 lines
9.5 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Client tests for SQL statement authorization
from __future__ import absolute_import, division, print_function
from getpass import getuser
import random
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import assert_file_in_dir_contains
from tests.common.test_result_verifier import error_msg_equal
from tests.common.test_vector import HS2
from tests.custom_cluster.test_local_catalog import TestLocalCatalogRetries
PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select']
ADMIN = "admin"
class TestAuthorization(CustomClusterTestSuite):
@classmethod
def default_test_protocol(cls):
return HS2
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
"--server_name=server1 --authorization_policy_file=ignored_file",
impala_log_dir="{deprecated_flags}", tmp_dir_placeholders=['deprecated_flags'],
disable_log_buffering=True)
def test_deprecated_flags(self):
assert_file_in_dir_contains(self.impala_log_dir, "Ignoring removed flag "
"authorization_policy_file")
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger "
"--min_privilege_set_for_show_stmts=select",
catalogd_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger")
def test_ranger_show_iceberg_metadata_tables(self, unique_name):
unique_db = unique_name + "_db"
tbl_name = "ice"
full_tbl_name = "{}.{}".format(unique_db, tbl_name)
non_existent_suffix = "_a"
non_existent_tbl_name = full_tbl_name + non_existent_suffix
priv = "select"
user = getuser()
query = "show metadata tables in {}".format(full_tbl_name)
query_non_existent = "show metadata tables in {}".format(non_existent_tbl_name)
admin_client = self.create_impala_client(user=ADMIN)
user_client = self.create_impala_client(user=user)
try:
admin_client.execute("drop database if exists {} cascade".format(unique_db))
admin_client.execute("create database {}".format(unique_db))
admin_client.execute(
"create table {} (i int) stored as iceberg".format(full_tbl_name))
# Check that when the user has no privileges on the database, the error is the same
# if we try an existing or a non-existing table.
exc1_str = str(self.execute_query_expect_failure(user_client, query))
exc2_str = str(self.execute_query_expect_failure(user_client, query_non_existent))
assert error_msg_equal(exc1_str, exc2_str)
assert "AuthorizationException" in exc1_str
assert "does not have privileges to access"
# Check that there is no error when the user has access to the table.
admin_client.execute("grant {priv} on database {db} to user {target_user}".format(
priv=priv, db=unique_db, target_user=user))
self.execute_query_expect_success(user_client, query)
finally:
admin_client.execute(
"revoke {priv} on database {db} from user {target_user}".format(
priv=priv, db=unique_db, target_user=user))
admin_client.execute("drop database if exists {} cascade".format(unique_db))
@staticmethod
def _verify_show_dbs(result, unique_name, visibility_privileges=PRIVILEGES):
""" Helper function for verifying the results of SHOW DATABASES below.
Only show databases with privileges implying any of the visibility_privileges.
"""
for priv in PRIVILEGES:
# Result lines are in the format of "db_name\tdb_comment"
db_name = 'db_%s_%s\t' % (unique_name, priv)
if priv != 'all' and priv not in visibility_privileges:
assert db_name not in result.data
else:
assert db_name in result.data
def _test_ranger_show_stmts_helper(self, unique_name, visibility_privileges):
unique_db = unique_name + "_db"
admin_client = self.create_impala_client(user=ADMIN)
try:
admin_client.execute("drop database if exists %s cascade" % unique_db)
admin_client.execute("create database %s" % unique_db)
for priv in PRIVILEGES:
admin_client.execute("create database db_%s_%s" % (unique_name, priv))
admin_client.execute("grant {0} on database db_{1}_{2} to user {3}"
.format(priv, unique_name, priv, getuser()))
admin_client.execute("create table %s.tbl_%s (i int)" % (unique_db, priv))
admin_client.execute("grant {0} on table {1}.tbl_{2} to user {3}"
.format(priv, unique_db, priv, getuser()))
# Admin can still see all the databases and tables
result = admin_client.execute("show databases")
TestAuthorization._verify_show_dbs(result, unique_name)
result = admin_client.execute("show tables in %s" % unique_db)
assert result.data == ["tbl_%s" % p for p in PRIVILEGES]
# Check SHOW DATABASES and SHOW TABLES using another username
result = self.client.execute("show databases")
TestAuthorization._verify_show_dbs(result, unique_name, visibility_privileges)
result = self.client.execute("show tables in %s" % unique_db)
# Only show tables with privileges implying any of the visibility privileges
assert 'tbl_all' in result.data # ALL can imply to any privilege
for p in visibility_privileges:
assert 'tbl_%s' % p in result.data
finally:
admin_client.execute("drop database if exists %s cascade" % unique_db)
for priv in PRIVILEGES:
admin_client.execute(
"drop database if exists db_%s_%s cascade" % (unique_name, priv))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger "
"--min_privilege_set_for_show_stmts=select",
catalogd_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger")
def test_ranger_show_stmts_with_select(self, unique_name):
self._test_ranger_show_stmts_helper(unique_name, ['select'])
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger "
"--min_privilege_set_for_show_stmts=select,insert",
catalogd_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger")
def test_ranger_show_stmts_with_select_insert(self, unique_name):
self._test_ranger_show_stmts_helper(unique_name, ['select', 'insert'])
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger "
"--min_privilege_set_for_show_stmts=any",
catalogd_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger")
def test_ranger_show_stmts_with_any(self, unique_name):
self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger "
"--num_check_authorization_threads=%d" % (random.randint(2, 128)),
catalogd_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger")
def test_num_check_authorization_threads_with_ranger(self, unique_name):
self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
@CustomClusterTestSuite.with_args(
impalad_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger "
"--use_local_catalog=true",
catalogd_args="--server-name=server1 --ranger_service_type=hive "
"--ranger_app_id=impala --authorization_provider=ranger "
"--catalog_topic_mode=minimal")
def test_local_catalog_show_dbs_with_transient_db(self, unique_name):
"""Regression test for IMPALA-13170"""
# Starts a background thread to create+drop the transient db.
# Use admin user to have create+drop privileges.
unique_database = unique_name + "_db"
admin_client = self.create_impala_client(user=ADMIN)
TestLocalCatalogRetries._run_show_dbs_with_transient_db(
unique_database, admin_client, self.client)