mirror of
https://github.com/apache/impala.git
synced 2025-12-19 09:58:28 -05:00
JniFrontend.getDbs() returns the thrift representation of all the dbs. This might trigger multiple getPartialCatalogObject requests to catalogd and could fail in InconsistentMetadataFetchException, e.g. if a db is removed after coordinator fetching the db name list and before coordinator fetching the msDb of that db. This patch fixes the issue by retrying the above steps when hitting InconsistentMetadataFetchException, similar to what other methods in Frontend do. Adds getThriftDbs() in Frontend to directly return the thrift db list so JniFrontend can use it directly and the retry can be added inside Frontend.java. TestAuthorization.test_local_catalog_show_dbs_with_transient_db is an existing test to verify a similar problem. Running this test with authorization disabled can reproduce the current bug. So this patch extracts the test code into TestLocalCatalogRetries._run_show_dbs_with_transient_db() and share it in both authz enabled and disabled tests. Tests - Ran TestLocalCatalogRetries.test_show_dbs_retry 60 times. Without the fix, it fails in about a dozen times. Change-Id: Ib337f88a2ac0f35142417f6cee51d30497f12845 Reviewed-on: http://gerrit.cloudera.org:8080/23402 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
196 lines
9.5 KiB
Python
196 lines
9.5 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Client tests for SQL statement authorization
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from getpass import getuser
|
|
import random
|
|
|
|
import pytest
|
|
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
|
from tests.common.file_utils import assert_file_in_dir_contains
|
|
from tests.common.test_result_verifier import error_msg_equal
|
|
from tests.common.test_vector import HS2
|
|
from tests.custom_cluster.test_local_catalog import TestLocalCatalogRetries
|
|
|
|
PRIVILEGES = ['all', 'alter', 'drop', 'insert', 'refresh', 'select']
|
|
ADMIN = "admin"
|
|
|
|
|
|
class TestAuthorization(CustomClusterTestSuite):
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
"--server_name=server1 --authorization_policy_file=ignored_file",
|
|
impala_log_dir="{deprecated_flags}", tmp_dir_placeholders=['deprecated_flags'],
|
|
disable_log_buffering=True)
|
|
def test_deprecated_flags(self):
|
|
assert_file_in_dir_contains(self.impala_log_dir, "Ignoring removed flag "
|
|
"authorization_policy_file")
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger "
|
|
"--min_privilege_set_for_show_stmts=select",
|
|
catalogd_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger")
|
|
def test_ranger_show_iceberg_metadata_tables(self, unique_name):
|
|
unique_db = unique_name + "_db"
|
|
tbl_name = "ice"
|
|
full_tbl_name = "{}.{}".format(unique_db, tbl_name)
|
|
non_existent_suffix = "_a"
|
|
non_existent_tbl_name = full_tbl_name + non_existent_suffix
|
|
priv = "select"
|
|
user = getuser()
|
|
query = "show metadata tables in {}".format(full_tbl_name)
|
|
query_non_existent = "show metadata tables in {}".format(non_existent_tbl_name)
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
user_client = self.create_impala_client(user=user)
|
|
try:
|
|
admin_client.execute("drop database if exists {} cascade".format(unique_db))
|
|
admin_client.execute("create database {}".format(unique_db))
|
|
admin_client.execute(
|
|
"create table {} (i int) stored as iceberg".format(full_tbl_name))
|
|
|
|
# Check that when the user has no privileges on the database, the error is the same
|
|
# if we try an existing or a non-existing table.
|
|
exc1_str = str(self.execute_query_expect_failure(user_client, query))
|
|
exc2_str = str(self.execute_query_expect_failure(user_client, query_non_existent))
|
|
assert error_msg_equal(exc1_str, exc2_str)
|
|
assert "AuthorizationException" in exc1_str
|
|
assert "does not have privileges to access"
|
|
|
|
# Check that there is no error when the user has access to the table.
|
|
admin_client.execute("grant {priv} on database {db} to user {target_user}".format(
|
|
priv=priv, db=unique_db, target_user=user))
|
|
self.execute_query_expect_success(user_client, query)
|
|
finally:
|
|
admin_client.execute(
|
|
"revoke {priv} on database {db} from user {target_user}".format(
|
|
priv=priv, db=unique_db, target_user=user))
|
|
admin_client.execute("drop database if exists {} cascade".format(unique_db))
|
|
|
|
@staticmethod
|
|
def _verify_show_dbs(result, unique_name, visibility_privileges=PRIVILEGES):
|
|
""" Helper function for verifying the results of SHOW DATABASES below.
|
|
Only show databases with privileges implying any of the visibility_privileges.
|
|
"""
|
|
for priv in PRIVILEGES:
|
|
# Result lines are in the format of "db_name\tdb_comment"
|
|
db_name = 'db_%s_%s\t' % (unique_name, priv)
|
|
if priv != 'all' and priv not in visibility_privileges:
|
|
assert db_name not in result.data
|
|
else:
|
|
assert db_name in result.data
|
|
|
|
def _test_ranger_show_stmts_helper(self, unique_name, visibility_privileges):
|
|
unique_db = unique_name + "_db"
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
try:
|
|
admin_client.execute("drop database if exists %s cascade" % unique_db)
|
|
admin_client.execute("create database %s" % unique_db)
|
|
for priv in PRIVILEGES:
|
|
admin_client.execute("create database db_%s_%s" % (unique_name, priv))
|
|
admin_client.execute("grant {0} on database db_{1}_{2} to user {3}"
|
|
.format(priv, unique_name, priv, getuser()))
|
|
admin_client.execute("create table %s.tbl_%s (i int)" % (unique_db, priv))
|
|
admin_client.execute("grant {0} on table {1}.tbl_{2} to user {3}"
|
|
.format(priv, unique_db, priv, getuser()))
|
|
|
|
# Admin can still see all the databases and tables
|
|
result = admin_client.execute("show databases")
|
|
TestAuthorization._verify_show_dbs(result, unique_name)
|
|
result = admin_client.execute("show tables in %s" % unique_db)
|
|
assert result.data == ["tbl_%s" % p for p in PRIVILEGES]
|
|
|
|
# Check SHOW DATABASES and SHOW TABLES using another username
|
|
result = self.client.execute("show databases")
|
|
TestAuthorization._verify_show_dbs(result, unique_name, visibility_privileges)
|
|
result = self.client.execute("show tables in %s" % unique_db)
|
|
# Only show tables with privileges implying any of the visibility privileges
|
|
assert 'tbl_all' in result.data # ALL can imply to any privilege
|
|
for p in visibility_privileges:
|
|
assert 'tbl_%s' % p in result.data
|
|
finally:
|
|
admin_client.execute("drop database if exists %s cascade" % unique_db)
|
|
for priv in PRIVILEGES:
|
|
admin_client.execute(
|
|
"drop database if exists db_%s_%s cascade" % (unique_name, priv))
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger "
|
|
"--min_privilege_set_for_show_stmts=select",
|
|
catalogd_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger")
|
|
def test_ranger_show_stmts_with_select(self, unique_name):
|
|
self._test_ranger_show_stmts_helper(unique_name, ['select'])
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger "
|
|
"--min_privilege_set_for_show_stmts=select,insert",
|
|
catalogd_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger")
|
|
def test_ranger_show_stmts_with_select_insert(self, unique_name):
|
|
self._test_ranger_show_stmts_helper(unique_name, ['select', 'insert'])
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger "
|
|
"--min_privilege_set_for_show_stmts=any",
|
|
catalogd_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger")
|
|
def test_ranger_show_stmts_with_any(self, unique_name):
|
|
self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger "
|
|
"--num_check_authorization_threads=%d" % (random.randint(2, 128)),
|
|
catalogd_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger")
|
|
def test_num_check_authorization_threads_with_ranger(self, unique_name):
|
|
self._test_ranger_show_stmts_helper(unique_name, PRIVILEGES)
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger "
|
|
"--use_local_catalog=true",
|
|
catalogd_args="--server-name=server1 --ranger_service_type=hive "
|
|
"--ranger_app_id=impala --authorization_provider=ranger "
|
|
"--catalog_topic_mode=minimal")
|
|
def test_local_catalog_show_dbs_with_transient_db(self, unique_name):
|
|
"""Regression test for IMPALA-13170"""
|
|
# Starts a background thread to create+drop the transient db.
|
|
# Use admin user to have create+drop privileges.
|
|
unique_database = unique_name + "_db"
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
TestLocalCatalogRetries._run_show_dbs_with_transient_db(
|
|
unique_database, admin_client, self.client)
|