mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This addendum logs the exception thrown in the runtime profile under the CalciteFailureReason key. Testing: test_ranger.py uses this. Change-Id: Ia18a52c488f9c73d51690997b277fd8e918c645f Reviewed-on: http://gerrit.cloudera.org:8080/23686 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
3708 lines
177 KiB
Python
3708 lines
177 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Client tests for SQL statement authorization
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from getpass import getuser
|
|
import grp
|
|
import json
|
|
import logging
|
|
import os
|
|
from subprocess import check_call
|
|
import tempfile
|
|
from time import sleep
|
|
|
|
from builtins import map, range
|
|
import pytest
|
|
import requests
|
|
|
|
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite, HIVE_CONF_DIR
|
|
from tests.common.file_utils import copy_files_to_hdfs_dir
|
|
from tests.common.iceberg_rest_server import IcebergRestServer
|
|
from tests.common.skip import SkipIf, SkipIfFS, SkipIfHive2
|
|
from tests.common.test_dimensions import (
|
|
create_client_protocol_dimension,
|
|
create_orc_dimension,
|
|
)
|
|
from tests.common.test_vector import HS2, ImpalaTestVector
|
|
from tests.shell.util import run_impala_shell_cmd
|
|
from tests.util.calculation_util import get_random_id
|
|
from tests.util.filesystem_utils import WAREHOUSE, WAREHOUSE_PREFIX
|
|
from tests.util.hdfs_util import NAMENODE
|
|
from tests.util.iceberg_util import get_snapshots
|
|
from tests.util.parse_util import bytes_to_str
|
|
|
|
ADMIN = "admin"
|
|
OWNER_USER = getuser()
|
|
NON_OWNER = "non_owner"
|
|
ERROR_GRANT = "User doesn't have necessary permission to grant access"
|
|
ERROR_REVOKE = "User doesn't have necessary permission to revoke access"
|
|
|
|
RANGER_AUTH = ("admin", "admin")
|
|
RANGER_HOST = "http://localhost:6080"
|
|
REST_HEADERS = {"Content-Type": "application/json", "Accept": "application/json"}
|
|
LEGACY_CATALOG_IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
|
|
"--ranger_app_id=impala --authorization_provider=ranger " \
|
|
"--use_local_catalog=false"
|
|
LEGACY_CATALOG_CATALOGD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
|
|
"--ranger_app_id=impala --authorization_provider=ranger " \
|
|
"--catalog_topic_mode=full"
|
|
|
|
IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
|
|
"--ranger_app_id=impala --authorization_provider=ranger --use_local_catalog=true"
|
|
CATALOGD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
|
|
"--ranger_app_id=impala --authorization_provider=ranger --catalog_topic_mode=minimal"
|
|
|
|
LOG = logging.getLogger('impala_test_suite')
|
|
|
|
|
|
class TestRanger(CustomClusterTestSuite):
|
|
"""
|
|
Base class for Apache Ranger integration test with Apache Impala.
|
|
This class only contains common helper or base test method.
|
|
Pytest method should be declared in either of TestRangerIndependent,
|
|
TestRangerLegacyCatalog, or TestRangerLocalCatalog.
|
|
"""
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
def _test_grant_revoke(self, unique_name, refresh_statements):
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_database = unique_name + "_db"
|
|
unique_table = unique_name + "_tbl"
|
|
group = grp.getgrnam(getuser()).gr_name
|
|
test_data = [(user, "USER"), (group, "GROUP")]
|
|
|
|
for refresh_stmt in refresh_statements:
|
|
for data in test_data:
|
|
ident = data[0]
|
|
kw = data[1]
|
|
try:
|
|
# Set-up temp database/table
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0}.{1} (x int)"
|
|
.format(unique_database, unique_table))
|
|
|
|
self.execute_query_expect_success(admin_client,
|
|
"grant select on database {0} to {1} {2}"
|
|
.format(unique_database, kw, ident))
|
|
self._refresh_authorization(admin_client, refresh_stmt)
|
|
result = self.execute_query("show grant {0} {1} on database {2}"
|
|
.format(kw, ident, unique_database))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, ident, unique_database, "", "", "", "", "", "*", "select", "false"],
|
|
[kw, ident, unique_database, "*", "*", "", "", "", "", "select", "false"]])
|
|
self.execute_query_expect_success(admin_client,
|
|
"revoke select on database {0} from {1} "
|
|
"{2}".format(unique_database, kw, ident))
|
|
self._refresh_authorization(admin_client, refresh_stmt)
|
|
result = self.execute_query("show grant {0} {1} on database {2}"
|
|
.format(kw, ident, unique_database))
|
|
TestRanger._check_privileges(result, [])
|
|
finally:
|
|
admin_client.execute("revoke select on database {0} from {1} {2}"
|
|
.format(unique_database, kw, ident))
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
|
|
def _update_privileges_and_verify(self, admin_client, update_stmt, show_grant_stmt,
|
|
expected_privileges):
|
|
admin_client.execute(update_stmt)
|
|
result = self.client.execute(show_grant_stmt)
|
|
TestRanger._check_privileges(result, expected_privileges)
|
|
|
|
def _test_show_grant_without_on(self, kw, ident):
|
|
self.execute_query_expect_failure(self.client, "show grant {0} {1}".format(kw, ident))
|
|
|
|
def _test_show_grant_user_group(self, admin_client, user, group, unique_db):
|
|
try:
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
TestRanger._check_privileges(result, [])
|
|
|
|
admin_client.execute("grant select on database {0} to group {1}"
|
|
.format(unique_db, group))
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
TestRanger._check_privileges(result, [
|
|
["GROUP", user, unique_db, "", "", "", "", "", "*", "select", "false"],
|
|
["GROUP", user, unique_db, "*", "*", "", "", "", "", "select", "false"]])
|
|
finally:
|
|
admin_client.execute("revoke select on database {0} from group {1}"
|
|
.format(unique_db, group))
|
|
|
|
def _test_show_grant_mask(self, admin_client, user):
|
|
privs_excl_rwstorage = ["select", "insert", "create", "alter", "drop", "refresh"]
|
|
try:
|
|
for privilege in privs_excl_rwstorage:
|
|
admin_client.execute("grant {0} on server to user {1}".format(privilege, user))
|
|
result = self.client.execute("show grant user {0} on server".format(user))
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user, "", "", "", "*", "", "", "", "alter", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "create", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "drop", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "insert", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "refresh", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "select", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "alter", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "create", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "drop", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "insert", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "refresh", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "select", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "alter", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "create", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "drop", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "insert", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "refresh", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "select", "false"]])
|
|
|
|
# GRANT RWSTORAGE ON SERVER additionally grants the RWSTORAGE privilege on all
|
|
# storage types and all storage URI's.
|
|
admin_client.execute("grant rwstorage on server to user {0}".format(user))
|
|
result = self.client.execute("show grant user {0} on server".format(user))
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user, "", "", "", "", "*", "*", "", "rwstorage", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "alter", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "create", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "drop", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "insert", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "refresh", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "select", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "alter", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "create", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "drop", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "insert", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "refresh", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "select", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "alter", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "create", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "drop", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "insert", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "refresh", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "select", "false"]])
|
|
|
|
admin_client.execute("grant all on server to user {0}".format(user))
|
|
result = self.client.execute("show grant user {0} on server".format(user))
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user, "", "", "", "", "*", "*", "", "rwstorage", "false"],
|
|
["USER", user, "", "", "", "*", "", "", "", "all", "false"],
|
|
["USER", user, "*", "", "", "", "", "", "*", "all", "false"],
|
|
["USER", user, "*", "*", "*", "", "", "", "", "all", "false"]])
|
|
finally:
|
|
admin_client.execute("revoke all on server from user {0}".format(user))
|
|
admin_client.execute("revoke rwstorage on server from user {0}".format(user))
|
|
for privilege in privs_excl_rwstorage:
|
|
admin_client.execute("revoke {0} on server from user {1}".format(privilege, user))
|
|
|
|
def _test_show_grant_mask_on_udf(self, admin_client, kw, id, unique_database, udf):
|
|
try:
|
|
# Grant the CREATE privilege and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant create on user_defined_fn {0}.{1} to {2} {3}"
|
|
.format(unique_database, udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.{3}"
|
|
.format(kw, id, unique_database, udf), [
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "create", "false"]])
|
|
|
|
# Grant the DROP privilege and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant drop on user_defined_fn {0}.{1} to {2} {3}"
|
|
.format(unique_database, udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.{3}"
|
|
.format(kw, id, unique_database, udf), [
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "create", "false"],
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "drop", "false"]])
|
|
|
|
# Grant the SELECT privilege and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select on user_defined_fn {0}.{1} to {2} {3}"
|
|
.format(unique_database, udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.{3}"
|
|
.format(kw, id, unique_database, udf), [
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "create", "false"],
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "drop", "false"],
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "select", "false"]])
|
|
|
|
# Grant the ALL privilege and verify other privileges are masked.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant all on user_defined_fn {0}.{1} to {2} {3}"
|
|
.format(unique_database, udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.{3}"
|
|
.format(kw, id, unique_database, udf), [
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "all", "false"]])
|
|
finally:
|
|
admin_client.execute("revoke create on user_defined_fn {0}.{1} from {2} {3}"
|
|
.format(unique_database, udf, kw, id))
|
|
admin_client.execute("revoke drop on user_defined_fn {0}.{1} from {2} {3}"
|
|
.format(unique_database, udf, kw, id))
|
|
admin_client.execute("revoke select on user_defined_fn {0}.{1} from {2} {3}"
|
|
.format(unique_database, udf, kw, id))
|
|
admin_client.execute("revoke all on user_defined_fn {0}.{1} from {2} {3}"
|
|
.format(unique_database, udf, kw, id))
|
|
|
|
def _test_show_grant_basic(self, admin_client, kw, id, unique_database, unique_table,
|
|
udf):
|
|
uri = WAREHOUSE_PREFIX + '/tmp'
|
|
database = 'functional'
|
|
table = 'alltypes'
|
|
storagehandler_uri = 'kudu://localhost/impala::tpch_kudu.nation'
|
|
try:
|
|
# Grant server privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant all on server to {0} {1}".format(kw, id),
|
|
"show grant {0} {1} on server".format(kw, id), [
|
|
[kw, id, "", "", "", "", "*", "*", "", "rwstorage", "false"],
|
|
[kw, id, "", "", "", "*", "", "", "", "all", "false"],
|
|
[kw, id, "*", "", "", "", "", "", "*", "all", "false"],
|
|
[kw, id, "*", "*", "*", "", "", "", "", "all", "false"]])
|
|
|
|
# Revoke server privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke all on server from {0} {1}".format(kw, id),
|
|
"show grant {0} {1} on server".format(kw, id), [])
|
|
|
|
# Grant uri privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant all on uri '{0}' to {1} {2}".format(uri, kw, id),
|
|
"show grant {0} {1} on uri '{2}'".format(kw, id, uri), [
|
|
[kw, id, "", "", "", "{0}{1}".format(NAMENODE, '/tmp'), "", "", "", "all",
|
|
"false"]])
|
|
|
|
# Revoke uri privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke all on uri '{0}' from {1} {2}".format(uri, kw, id),
|
|
"show grant {0} {1} on uri '{2}'".format(kw, id, uri), [])
|
|
|
|
# Grant storage handler URI privilege and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant rwstorage on storagehandler_uri '{0}' to {1} {2}"
|
|
.format(storagehandler_uri, kw, id),
|
|
"show grant {0} {1} on storagehandler_uri '{2}'"
|
|
.format(kw, id, storagehandler_uri), [
|
|
[kw, id, "", "", "", "", "kudu", "localhost/impala::tpch_kudu.nation", "",
|
|
"rwstorage", "false"]])
|
|
|
|
# Grant the rwstorage privilege on a database does not result in the grantee being
|
|
# granted the privilege on the database.
|
|
self._update_privileges_and_verify(
|
|
admin_client,
|
|
"grant rwstorage on database {0} to {1} {2}".format(database, kw, id),
|
|
"show grant {0} {1} on database {2}".format(kw, id, database), [])
|
|
|
|
# Grant the rwstorage privilege on a table does not result in the grantee being
|
|
# granted the privilege on the table.
|
|
self._update_privileges_and_verify(
|
|
admin_client,
|
|
"grant rwstorage on table {0}.{1} to {2} {3}".format(database, table, kw, id),
|
|
"show grant {0} {1} on table {2}.{3}".format(kw, id, database, table), [])
|
|
|
|
# Revoke storage handler URI privilege and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke rwstorage on storagehandler_uri '{0}' from {1} {2}"
|
|
.format(storagehandler_uri, kw, id),
|
|
"show grant {0} {1} on storagehandler_uri '{2}'"
|
|
.format(kw, id, storagehandler_uri), [])
|
|
|
|
# Grant database privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select on database {0} to {1} {2}"
|
|
.format(unique_database, kw, id),
|
|
"show grant {0} {1} on database {2}".format(kw, id, unique_database), [
|
|
[kw, id, unique_database, "", "", "", "", "", "*", "select", "false"],
|
|
[kw, id, unique_database, "*", "*", "", "", "", "", "select", "false"]])
|
|
|
|
# Revoke database privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke select on database {0} from {1} {2}"
|
|
.format(unique_database, kw, id),
|
|
"show grant {0} {1} on database {2}".format(kw, id, unique_database), [])
|
|
|
|
# Grant table privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select on table {0}.{1} to {2} {3}"
|
|
.format(unique_database, unique_table, kw, id),
|
|
"show grant {0} {1} on table {2}.{3}"
|
|
.format(kw, id, unique_database, unique_table), [
|
|
[kw, id, unique_database, unique_table, "*", "", "", "", "", "select",
|
|
"false"]])
|
|
|
|
# Revoke table privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke select on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id),
|
|
"show grant {0} {1} on table {2}.{3}"
|
|
.format(kw, id, unique_database, unique_table), [])
|
|
|
|
# Grant a privilege on a UDF with a wildcard for both database name and function
|
|
# name.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select on user_defined_fn `*`.`*` to {0} {1}"
|
|
.format(kw, id),
|
|
"show grant {0} {1} on user_defined_fn `*`.`*`".format(kw, id), [
|
|
[kw, id, "*", "", "", "", "", "", "*", "select", "false"]])
|
|
|
|
# Revoke the granted privilege and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client,
|
|
"revoke select on user_defined_fn `*`.`*` from {0} {1}".format(kw, id),
|
|
"show grant {0} {1} on user_defined_fn `*`.`*`".format(kw, id), [])
|
|
|
|
# Grant a privilege on a UDF with functional name in wildcard.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select on user_defined_fn {0}.`*` to {1} {2}"
|
|
.format(unique_database, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.`*`"
|
|
.format(kw, id, unique_database), [
|
|
[kw, id, unique_database, "", "", "", "", "", "*", "select", "false"]])
|
|
|
|
# Revoke the granted privilege and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke select on user_defined_fn {0}.`*` from {1} {2}"
|
|
.format(unique_database, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.`*`"
|
|
.format(kw, id, unique_database), [])
|
|
|
|
# Grant a privilege on a UDF with a wildcard for database name but a
|
|
# non-wildcard for functional name.
|
|
self._update_privileges_and_verify(
|
|
admin_client,
|
|
"grant select on user_defined_fn `*`.{0} to {1} {2}".format(udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn `*`.{2}".format(kw, id, udf), [
|
|
[kw, id, "*", "", "", "", "", "", udf, "select", "false"]])
|
|
|
|
# Revoke the granted privilege and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client,
|
|
"revoke select on user_defined_fn `*`.{0} from {1} {2}".format(udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn `*`.{2}".format(kw, id, udf), [])
|
|
|
|
# Grant a privilege on a UDF and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select on user_defined_fn {0}.{1} to {2} {3}"
|
|
.format(unique_database, udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.{3}"
|
|
.format(kw, id, unique_database, udf), [
|
|
[kw, id, unique_database, "", "", "", "", "", udf, "select", "false"]])
|
|
|
|
# Revoke the granted privilege and verify.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke select on user_defined_fn {0}.{1} from {2} {3}"
|
|
.format(unique_database, udf, kw, id),
|
|
"show grant {0} {1} on user_defined_fn {2}.{3}"
|
|
.format(kw, id, unique_database, udf), [])
|
|
|
|
# Grant column privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select(x) on table {0}.{1} to {2} {3}"
|
|
.format(unique_database, unique_table, kw, id),
|
|
"show grant {0} {1} on column {2}.{3}.x"
|
|
.format(kw, id, unique_database, unique_table), [
|
|
[kw, id, unique_database, unique_table, "x", "", "", "", "", "select",
|
|
"false"]])
|
|
|
|
# Revoke column privileges and verify
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke select(x) on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id),
|
|
"show grant {0} {1} on column {2}.{3}.x"
|
|
.format(kw, id, unique_database, unique_table), [])
|
|
finally:
|
|
admin_client.execute("revoke all on server from {0} {1}".format(kw, id))
|
|
admin_client.execute("revoke all on uri '{0}' from {1} {2}"
|
|
.format(uri, kw, id))
|
|
admin_client.execute("revoke rwstorage on storagehandler_uri '{0}' from {1} {2}"
|
|
.format(storagehandler_uri, kw, id))
|
|
admin_client.execute("revoke select on database {0} from {1} {2}"
|
|
.format(unique_database, kw, id))
|
|
admin_client.execute("revoke select on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
admin_client.execute("revoke select on user_defined_fn `*`.`*` from {0} {1}"
|
|
.format(kw, id))
|
|
admin_client.execute("revoke select on user_defined_fn {0}.`*` from {1} {2}"
|
|
.format(unique_database, kw, id))
|
|
admin_client.execute("revoke select on user_defined_fn `*`.{0} from {1} {2}"
|
|
.format(udf, kw, id))
|
|
admin_client.execute("revoke select on user_defined_fn {0}.{1} from {2} {3}"
|
|
.format(unique_database, udf, kw, id))
|
|
admin_client.execute("revoke select(x) on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
|
|
def _test_show_grant_inherited(self, admin_client, kw, id, unique_database,
|
|
unique_table):
|
|
try:
|
|
# Grant the select privilege on server
|
|
admin_client.execute("grant select on server to {0} {1}".format(kw, id))
|
|
|
|
# Verify the privileges are correctly added
|
|
result = self.client.execute("show grant {0} {1} on server".format(kw, id))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, "", "", "", "*", "", "", "", "select", "false"],
|
|
[kw, id, "*", "", "", "", "", "", "*", "select", "false"],
|
|
[kw, id, "*", "*", "*", "", "", "", "", "select", "false"]])
|
|
|
|
# Verify the highest level of resource that contains the specified resource could
|
|
# be computed when the specified resource is a database
|
|
result = self.client.execute("show grant {0} {1} on database {2}"
|
|
.format(kw, id, unique_database))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, "*", "", "", "", "", "", "*", "select", "false"],
|
|
[kw, id, "*", "*", "*", "", "", "", "", "select", "false"]])
|
|
|
|
# Verify the highest level of resource that contains the specified resource could
|
|
# be computed when the specified resource is a table
|
|
result = self.client.execute("show grant {0} {1} on table {2}.{3}"
|
|
.format(kw, id, unique_database, unique_table))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, "*", "*", "*", "", "", "", "", "select", "false"]])
|
|
|
|
# Verify the highest level of resource that contains the specified resource could
|
|
# be computed when the specified resource is a column
|
|
result = self.client.execute("show grant {0} {1} on column {2}.{3}.x"
|
|
.format(kw, id, unique_database, unique_table))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, "*", "*", "*", "", "", "", "", "select", "false"]])
|
|
|
|
# Grant the create privilege on database and verify
|
|
admin_client.execute("grant create on database {0} to {1} {2}"
|
|
.format(unique_database, kw, id))
|
|
result = self.client.execute("show grant {0} {1} on database {2}"
|
|
.format(kw, id, unique_database))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, "*", "", "", "", "", "", "*", "select", "false"],
|
|
[kw, id, "*", "*", "*", "", "", "", "", "select", "false"],
|
|
[kw, id, unique_database, "", "", "", "", "", "*", "create", "false"],
|
|
[kw, id, unique_database, "*", "*", "", "", "", "", "create", "false"]
|
|
])
|
|
|
|
# Grant the insert privilege on table and verify
|
|
admin_client.execute("grant insert on table {0}.{1} to {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
result = self.client.execute("show grant {0} {1} on table {2}.{3}"
|
|
.format(kw, id, unique_database, unique_table))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, "*", "*", "*", "", "", "", "", "select", "false"],
|
|
[kw, id, unique_database, "*", "*", "", "", "", "", "create", "false"],
|
|
[kw, id, unique_database, unique_table, "*", "", "", "", "", "insert", "false"]
|
|
])
|
|
|
|
# Grant the select privilege on column and verify
|
|
admin_client.execute("grant select(x) on table {0}.{1} to {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
result = self.client.execute("show grant {0} {1} on column {2}.{3}.x"
|
|
.format(kw, id, unique_database, unique_table))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, unique_database, "*", "*", "", "", "", "", "create", "false"],
|
|
[kw, id, unique_database, unique_table, "*", "", "", "", "", "insert", "false"],
|
|
[kw, id, unique_database, unique_table, "x", "", "", "", "", "select", "false"]
|
|
])
|
|
|
|
# The insert privilege on table masks the select privilege just added
|
|
admin_client.execute("grant select on table {0}.{1} to {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
result = self.client.execute("show grant {0} {1} on column {2}.{3}.x"
|
|
.format(kw, id, unique_database, unique_table))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, unique_database, "*", "*", "", "", "", "", "create", "false"],
|
|
[kw, id, unique_database, unique_table, "*", "", "", "", "", "insert", "false"],
|
|
[kw, id, unique_database, unique_table, "x", "", "", "", "", "select", "false"]
|
|
])
|
|
|
|
# The all privilege on table masks the privileges of insert and select, but not the
|
|
# select privilege on column.
|
|
admin_client.execute("grant all on table {0}.{1} to {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
result = self.client.execute("show grant {0} {1} on column {2}.{3}.x"
|
|
.format(kw, id, unique_database, unique_table))
|
|
TestRanger._check_privileges(result, [
|
|
[kw, id, unique_database, unique_table, "*", "", "", "", "", "all", "false"],
|
|
[kw, id, unique_database, unique_table, "x", "", "", "", "", "select", "false"]
|
|
])
|
|
|
|
finally:
|
|
admin_client.execute("revoke select on server from {0} {1}".format(kw, id))
|
|
admin_client.execute("revoke create on database {0} from {1} {2}"
|
|
.format(unique_database, kw, id))
|
|
admin_client.execute("revoke insert on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
admin_client.execute("revoke select(x) on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
admin_client.execute("revoke select on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
admin_client.execute("revoke all on table {0}.{1} from {2} {3}"
|
|
.format(unique_database, unique_table, kw, id))
|
|
|
|
@staticmethod
|
|
def _grant_ranger_privilege(user, resource, access):
|
|
data = {
|
|
"grantor": ADMIN,
|
|
"grantorGroups": [],
|
|
"resource": resource,
|
|
"users": [user],
|
|
"groups": [],
|
|
"accessTypes": access,
|
|
"delegateAdmin": "false",
|
|
"enableAudit": "true",
|
|
"replaceExistingPermissions": "false",
|
|
"isRecursive": "false",
|
|
"clusterName": "server1"
|
|
}
|
|
r = requests.post("{0}/service/plugins/services/grant/test_impala?pluginId=impala"
|
|
.format(RANGER_HOST),
|
|
auth=RANGER_AUTH, json=data, headers=REST_HEADERS)
|
|
assert 200 <= r.status_code < 300
|
|
|
|
@staticmethod
|
|
def _revoke_ranger_privilege(user, resource, access):
|
|
data = {
|
|
"grantor": ADMIN,
|
|
"grantorGroups": [],
|
|
"resource": resource,
|
|
"users": [user],
|
|
"groups": [],
|
|
"accessTypes": access,
|
|
"delegateAdmin": "false",
|
|
"enableAudit": "true",
|
|
"replaceExistingPermissions": "false",
|
|
"isRecursive": "false",
|
|
"clusterName": "server1"
|
|
}
|
|
r = requests.post("{0}/service/plugins/services/revoke/test_impala?pluginId=impala"
|
|
.format(RANGER_HOST),
|
|
auth=RANGER_AUTH, json=data, headers=REST_HEADERS)
|
|
assert 200 <= r.status_code < 300
|
|
|
|
@staticmethod
|
|
def _get_ranger_privileges_db(user, db):
|
|
policies = TestRanger._get_ranger_privileges()
|
|
result = []
|
|
|
|
for policy in policies:
|
|
resources = policy["resources"]
|
|
if "database" in resources and db in resources["database"]["values"]:
|
|
for policy_items in policy["policyItems"]:
|
|
if user in policy_items["users"]:
|
|
for access in policy_items["accesses"]:
|
|
result.append(access["type"])
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def _get_ranger_privileges():
|
|
r = requests.get("{0}/service/plugins/policies"
|
|
.format(RANGER_HOST),
|
|
auth=RANGER_AUTH, headers=REST_HEADERS)
|
|
return json.loads(r.content)["policies"]
|
|
|
|
def _add_ranger_user(self, user):
|
|
data = {"name": user, "password": "Password123", "userRoleList": ["ROLE_USER"]}
|
|
r = requests.post("{0}/service/xusers/secure/users".format(RANGER_HOST),
|
|
auth=RANGER_AUTH,
|
|
json=data, headers=REST_HEADERS)
|
|
return json.loads(r.content)["id"]
|
|
|
|
def _remove_ranger_user(self, id):
|
|
r = requests.delete("{0}/service/xusers/users/{1}?forceDelete=true"
|
|
.format(RANGER_HOST, id), auth=RANGER_AUTH)
|
|
assert 300 > r.status_code >= 200
|
|
|
|
@staticmethod
|
|
def _add_column_masking_policy(
|
|
policy_name, user, db, table, column, mask_type, value_expr=None):
|
|
""" Adds a column masking policy and returns the policy id"""
|
|
data = {
|
|
"name": policy_name,
|
|
"policyType": 1,
|
|
"serviceType": "hive",
|
|
"service": "test_impala",
|
|
"resources": {
|
|
"database": {
|
|
"values": [db],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
},
|
|
"table": {
|
|
"values": [table],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
},
|
|
"column": {
|
|
"values": [column],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
}
|
|
},
|
|
"dataMaskPolicyItems": [
|
|
{
|
|
"accesses": [
|
|
{
|
|
"type": "select",
|
|
"isAllowed": True
|
|
}
|
|
],
|
|
"users": [user],
|
|
"dataMaskInfo": {
|
|
"dataMaskType": mask_type,
|
|
"valueExpr": value_expr
|
|
}
|
|
}
|
|
]
|
|
}
|
|
r = requests.post("{0}/service/public/v2/api/policy".format(RANGER_HOST),
|
|
auth=RANGER_AUTH, json=data, headers=REST_HEADERS)
|
|
assert 300 > r.status_code >= 200, r.content
|
|
policy_id = json.loads(r.content)["id"]
|
|
LOG.info("Added column masking policy " + str(policy_id))
|
|
return policy_id
|
|
|
|
@staticmethod
|
|
def _add_row_filtering_policy(policy_name, user, db, table, filter_expr):
|
|
"""Adds a row filtering policy and returns the policy id"""
|
|
TestRanger._add_multiuser_row_filtering_policy(policy_name, db, table, [user],
|
|
[filter_expr])
|
|
|
|
@staticmethod
|
|
def _add_multiuser_row_filtering_policy(policy_name, db, table, users, filters):
|
|
"""Adds a row filtering policy on 'db'.'table' and returns the policy id.
|
|
If len(filters) == 1, all users use the same filter. Otherwise,
|
|
users[0] has filters[0], users[1] has filters[1] and so on."""
|
|
assert len(users) > 0
|
|
assert len(filters) == 1 or len(users) == len(filters)
|
|
items = []
|
|
if len(filters) == 1:
|
|
items.append({
|
|
"accesses": [
|
|
{
|
|
"type": "select",
|
|
"isAllowed": True
|
|
}
|
|
],
|
|
"users": users,
|
|
"rowFilterInfo": {"filterExpr": filters[0]}
|
|
})
|
|
else:
|
|
for i in range(len(users)):
|
|
items.append({
|
|
"accesses": [
|
|
{
|
|
"type": "select",
|
|
"isAllowed": True
|
|
}
|
|
],
|
|
"users": [users[i]],
|
|
"rowFilterInfo": {"filterExpr": filters[i]}
|
|
})
|
|
TestRanger._add_row_filtering_policy_with_items(policy_name, db, table, items)
|
|
|
|
@staticmethod
|
|
def _add_row_filtering_policy_with_items(policy_name, db, table, items):
|
|
""" Adds a row filtering policy and returns the policy id"""
|
|
policy_data = {
|
|
"name": policy_name,
|
|
"policyType": 2,
|
|
"serviceType": "hive",
|
|
"service": "test_impala",
|
|
"resources": {
|
|
"database": {
|
|
"values": [db],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
},
|
|
"table": {
|
|
"values": [table],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
}
|
|
},
|
|
"rowFilterPolicyItems": items
|
|
}
|
|
r = requests.post("{0}/service/public/v2/api/policy".format(RANGER_HOST),
|
|
auth=RANGER_AUTH, json=policy_data, headers=REST_HEADERS)
|
|
assert 300 > r.status_code >= 200, r.content
|
|
LOG.info("Added row filtering policy on table {0}.{1} for using items {2}"
|
|
.format(db, table, items))
|
|
return json.loads(r.content)["id"]
|
|
|
|
@staticmethod
|
|
def _add_deny_policy(policy_name, user, db, table, column):
|
|
""" Adds a deny policy and returns the policy id"""
|
|
data = {
|
|
"name": policy_name,
|
|
"policyType": 0,
|
|
"serviceType": "hive",
|
|
"service": "test_impala",
|
|
"resources": {
|
|
"database": {
|
|
"values": [db],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
},
|
|
"table": {
|
|
"values": [table],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
},
|
|
"column": {
|
|
"values": [column],
|
|
"isExcludes": False,
|
|
"isRecursive": False
|
|
}
|
|
},
|
|
"policyItems": [],
|
|
"denyPolicyItems": [
|
|
{
|
|
"accesses": [
|
|
{
|
|
"type": "select",
|
|
"isAllowed": True
|
|
}
|
|
],
|
|
"users": [user],
|
|
}
|
|
]
|
|
}
|
|
r = requests.post("{0}/service/public/v2/api/policy".format(RANGER_HOST),
|
|
auth=RANGER_AUTH, json=data, headers=REST_HEADERS)
|
|
assert 300 > r.status_code >= 200, r.content
|
|
return json.loads(r.content)["id"]
|
|
|
|
@staticmethod
|
|
def _remove_policy(policy_name):
|
|
r = requests.delete(
|
|
"{0}/service/public/v2/api/policy?servicename=test_impala&policyname={1}".format(
|
|
RANGER_HOST, policy_name),
|
|
auth=RANGER_AUTH, headers=REST_HEADERS)
|
|
assert 300 > r.status_code >= 200, bytes_to_str(r.content)
|
|
|
|
@staticmethod
|
|
def _check_privileges(result, expected):
|
|
def columns(row):
|
|
cols = row.split("\t")
|
|
return cols[0:len(cols) - 1]
|
|
|
|
assert list(map(columns, result.data)) == expected
|
|
|
|
def _refresh_authorization(self, client, statement):
|
|
if statement is not None:
|
|
self.execute_query_expect_success(client, statement)
|
|
|
|
def _run_query_as_user(self, query, username, expect_success):
|
|
"""Helper to run an input query as a given user."""
|
|
with self.create_impala_client(user=username) as impala_client:
|
|
if expect_success:
|
|
return self.execute_query_expect_success(
|
|
impala_client, query, query_options={'sync_ddl': 1})
|
|
return self.execute_query_expect_failure(impala_client, query)
|
|
|
|
def _run_query_with_client(self, query, impala_client, expect_success):
|
|
"""Helper to run an input query using a given impala_client."""
|
|
if expect_success:
|
|
return self.execute_query_expect_success(
|
|
impala_client, query, query_options={'sync_ddl': 1})
|
|
return self.execute_query_expect_failure(impala_client, query)
|
|
|
|
def _test_grant_multiple_columns(self, expected_num_policies):
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
access_type = "select"
|
|
db = "functional"
|
|
tbl = "alltypes"
|
|
cols = ["id", "bool_col", "tinyint_col", "smallint_col", "int_col", "bigint_col",
|
|
"float_col", "double_col", "date_string_col", "string_col", "timestamp_col",
|
|
"year", "month"]
|
|
cols_str = ""
|
|
for col in cols:
|
|
if not cols_str:
|
|
cols_str = col
|
|
else:
|
|
cols_str = cols_str + ", " + col
|
|
test_data = [("user", "non_owner", "users"), ("group", "non_owner", "groups"),
|
|
("role", "test_role", "roles")]
|
|
|
|
for data in test_data:
|
|
kw = data[0]
|
|
principal_name = data[1]
|
|
principal_key = data[2]
|
|
try:
|
|
policy_ids = set()
|
|
if kw == "role":
|
|
admin_client.execute("create role {0}".format(principal_name))
|
|
admin_client.execute("grant {0}({1}) on table {2}.{3} to {4} {5}"
|
|
.format(access_type, cols_str, db, tbl, kw, principal_name))
|
|
policies = TestRanger._get_ranger_privileges()
|
|
for col in cols:
|
|
policy_ids = policy_ids \
|
|
.union(TestRanger._get_ranger_policy_ids(policies, principal_name,
|
|
principal_key, db, tbl, col, access_type))
|
|
# After the GRANT statement above, there should be only one single Ranger policy
|
|
# that grants the privilege of 'access_type' on the column 'db'.'tbl'.'col' to
|
|
# the principal 'principal_name' for each column in 'cols'.
|
|
assert len(policy_ids) == expected_num_policies
|
|
finally:
|
|
admin_client.execute("revoke {0}({1}) on table {2}.{3} from {4} {5}"
|
|
.format(access_type, cols_str, db, tbl, kw, principal_name))
|
|
if kw == "role":
|
|
admin_client.execute("drop role {0}".format(principal_name))
|
|
|
|
@staticmethod
|
|
def _get_ranger_policy_ids(policies, principal_name, principal_key, db, tbl, col,
|
|
access_type):
|
|
"""Returns the set of Ranger policy id's that grant the privilege of 'access_type' on
|
|
the column 'db'.'tbl'.'col'. to the principal 'principal_name'."""
|
|
result = set()
|
|
|
|
for policy in policies:
|
|
id = policy["id"]
|
|
resources = policy["resources"]
|
|
if "database" in resources and db in resources["database"]["values"] \
|
|
and "table" in resources and tbl in resources["table"]["values"] \
|
|
and "column" in resources and col in resources["column"]["values"]:
|
|
for policy_items in policy["policyItems"]:
|
|
if principal_name in policy_items[principal_key]:
|
|
for access in policy_items["accesses"]:
|
|
if access_type in access["type"] and access["isAllowed"] is True:
|
|
result.add(id)
|
|
break
|
|
return result
|
|
|
|
def _test_grant_revoke_by_owner(self, unique_name):
|
|
non_owner_user = NON_OWNER
|
|
non_owner_group = NON_OWNER
|
|
grantee_role = "grantee_role"
|
|
resource_owner_role = OWNER_USER
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
user_client = self.create_impala_client(user=OWNER_USER)
|
|
unique_database = unique_name + "_db"
|
|
table_name = "tbl"
|
|
column_names = ["a", "b"]
|
|
udf_uri = "/test-warehouse/impala-hive-udfs.jar"
|
|
udf_name = "identity"
|
|
test_data = [("USER", non_owner_user), ("GROUP", non_owner_group),
|
|
("ROLE", grantee_role)]
|
|
privileges = ["alter", "drop", "create", "insert", "select", "refresh"]
|
|
|
|
try:
|
|
admin_client.execute("create role {}".format(grantee_role))
|
|
admin_client.execute("create role {}".format(resource_owner_role))
|
|
admin_client.execute("grant create on server to user {0}".format(OWNER_USER))
|
|
# A user has to be granted the ALL privilege on the URI of the UDF as well to be
|
|
# able to create a UDF.
|
|
admin_client.execute("grant all on uri '{0}{1}' to user {2}"
|
|
.format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER))
|
|
self._run_query_with_client("create database {0}".format(unique_database),
|
|
user_client, True)
|
|
self._run_query_with_client("create table {0}.{1} ({2} int, {3} string)"
|
|
.format(unique_database, table_name, column_names[0], column_names[1]),
|
|
user_client, True)
|
|
self._run_query_with_client("create function {0}.{1} "
|
|
"location '{2}{3}' symbol='org.apache.impala.TestUdf'"
|
|
.format(unique_database, udf_name, os.getenv("FILESYSTEM_PREFIX"), udf_uri),
|
|
user_client, True)
|
|
|
|
for data in test_data:
|
|
grantee_type = data[0]
|
|
grantee = data[1]
|
|
for privilege in privileges:
|
|
# Case 1: when the resource is a database.
|
|
self._test_grant_revoke_by_owner_on_database(privilege, unique_database,
|
|
grantee_type, grantee, resource_owner_role)
|
|
|
|
# Case 2: when the resource is a table.
|
|
self._test_grant_revoke_by_owner_on_table(privilege, unique_database,
|
|
table_name, grantee_type, grantee, resource_owner_role)
|
|
|
|
# Case 3: when the resource is a column.
|
|
self._test_grant_revoke_by_owner_on_column(privilege, column_names,
|
|
unique_database, table_name, grantee_type, grantee, resource_owner_role)
|
|
|
|
# Case 4: when the resource is a UDF.
|
|
self._test_grant_revoke_by_owner_on_udf(privilege, unique_database, udf_name,
|
|
grantee_type, grantee)
|
|
finally:
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
admin_client.execute("drop role {}".format(grantee_role))
|
|
admin_client.execute("drop role {}".format(resource_owner_role))
|
|
admin_client.execute("revoke create on server from user {0}".format(OWNER_USER))
|
|
admin_client.execute("revoke all on uri '{0}{1}' from user {2}"
|
|
.format(os.getenv("FILESYSTEM_PREFIX"), udf_uri, OWNER_USER))
|
|
admin_client.close()
|
|
user_client.close()
|
|
|
|
def _test_grant_revoke_by_owner_on_database(self, privilege, unique_database,
|
|
grantee_type, grantee, resource_owner_role):
|
|
grant_database_stmt = "grant {0} on database {1} to {2} {3}"
|
|
revoke_database_stmt = "revoke {0} on database {1} from {2} {3}"
|
|
show_grant_database_stmt = "show grant {0} {1} on database {2}"
|
|
set_database_owner_user_stmt = "alter database {0} set owner user {1}"
|
|
set_database_owner_group_stmt = "alter database {0} set owner group {1}"
|
|
set_database_owner_role_stmt = "alter database {0} set owner role {1}"
|
|
resource_owner_group = OWNER_USER
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
user_client = self.create_impala_client(user=OWNER_USER)
|
|
|
|
try:
|
|
self._run_query_with_client(grant_database_stmt
|
|
.format(privilege, unique_database, grantee_type, grantee), user_client,
|
|
True)
|
|
result = admin_client.execute(show_grant_database_stmt
|
|
.format(grantee_type, grantee, unique_database))
|
|
TestRanger._check_privileges(result, [
|
|
[grantee_type, grantee, unique_database, "", "", "", "", "", "*",
|
|
privilege, "false"],
|
|
[grantee_type, grantee, unique_database, "*", "*", "", "", "", "",
|
|
privilege, "false"]])
|
|
|
|
self._run_query_with_client(revoke_database_stmt
|
|
.format(privilege, unique_database, grantee_type, grantee), user_client,
|
|
True)
|
|
result = admin_client.execute(show_grant_database_stmt
|
|
.format(grantee_type, grantee, unique_database))
|
|
TestRanger._check_privileges(result, [])
|
|
|
|
# Set the owner of the database to a group that has the same name as 'OWNER_USER'
|
|
# and verify that the user 'OWNER_USER' is not able to grant or revoke a privilege
|
|
# on the database.
|
|
# We run the statement in Hive because currently
|
|
# ALTER DATABASE SET OWNER GROUP is not supported in Impala. Note that we
|
|
# need to force Impala to reload the metadata of database since the change
|
|
# is made through Hive.
|
|
self.run_stmt_in_hive(set_database_owner_group_stmt
|
|
.format(unique_database, resource_owner_group))
|
|
admin_client.execute("invalidate metadata")
|
|
|
|
result = self._run_query_with_client(grant_database_stmt
|
|
.format(privilege, unique_database, grantee_type, grantee), user_client,
|
|
False)
|
|
assert ERROR_GRANT in str(result)
|
|
result = self._run_query_with_client(revoke_database_stmt
|
|
.format(privilege, unique_database, grantee_type, grantee), user_client,
|
|
False)
|
|
assert ERROR_REVOKE in str(result)
|
|
|
|
# Set the owner of the database to a role that has the same name as 'OWNER_USER'
|
|
# and verify that the user 'OWNER_USER' is not able to grant or revoke a
|
|
# privilege on the database.
|
|
admin_client.execute(set_database_owner_role_stmt
|
|
.format(unique_database, resource_owner_role))
|
|
|
|
result = self._run_query_with_client(grant_database_stmt
|
|
.format(privilege, unique_database, grantee_type, grantee), user_client,
|
|
False)
|
|
assert ERROR_GRANT in str(result)
|
|
result = self._run_query_with_client(revoke_database_stmt
|
|
.format(privilege, unique_database, grantee_type, grantee), user_client,
|
|
False)
|
|
assert ERROR_REVOKE in str(result)
|
|
# Change the database owner back to the user 'OWNER_USER'.
|
|
admin_client.execute(set_database_owner_user_stmt
|
|
.format(unique_database, OWNER_USER))
|
|
finally:
|
|
# Revoke the privilege that was granted by 'OWNER_USER' in case any of the
|
|
# REVOKE statements submitted by 'owner_user' failed to prevent this test
|
|
# from interfering with other tests.
|
|
admin_client.execute(revoke_database_stmt
|
|
.format(privilege, unique_database, grantee_type, grantee))
|
|
admin_client.close()
|
|
user_client.close()
|
|
|
|
def _test_grant_revoke_by_owner_on_table(self, privilege, unique_database, table_name,
|
|
grantee_type, grantee, resource_owner_role):
|
|
# The CREATE privilege on a table is not supported.
|
|
if privilege == "create":
|
|
return
|
|
grant_table_stmt = "grant {0} on table {1}.{2} to {3} {4}"
|
|
revoke_table_stmt = "revoke {0} on table {1}.{2} from {3} {4}"
|
|
show_grant_table_stmt = "show grant {0} {1} on table {2}.{3}"
|
|
resource_owner_group = OWNER_USER
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
user_client = self.create_impala_client(user=OWNER_USER)
|
|
set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}"
|
|
set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}"
|
|
set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}"
|
|
|
|
try:
|
|
self._run_query_with_client(grant_table_stmt
|
|
.format(privilege, unique_database, table_name, grantee_type, grantee),
|
|
user_client, True)
|
|
result = admin_client.execute(show_grant_table_stmt
|
|
.format(grantee_type, grantee, unique_database, table_name))
|
|
TestRanger._check_privileges(result, [
|
|
[grantee_type, grantee, unique_database, table_name, "*", "", "", "",
|
|
"", privilege, "false"]])
|
|
|
|
self._run_query_with_client(revoke_table_stmt
|
|
.format(privilege, unique_database, table_name, grantee_type, grantee),
|
|
user_client, True)
|
|
result = admin_client.execute(show_grant_table_stmt
|
|
.format(grantee_type, grantee, unique_database, table_name))
|
|
TestRanger._check_privileges(result, [])
|
|
|
|
# Set the owner of the table to a group that has the same name as
|
|
# 'OWNER_USER' and verify that the user 'OWNER_USER' is not able to grant or
|
|
# revoke a privilege on the table.
|
|
# We run the statement in Hive because currently
|
|
# ALTER TABLE SET OWNER GROUP is not supported in Impala. Note that we
|
|
# need to force Impala to reload the metadata of table since the change
|
|
# is made through Hive.
|
|
self.run_stmt_in_hive(set_table_owner_group_stmt
|
|
.format(unique_database, table_name, resource_owner_group))
|
|
admin_client.execute("refresh {0}.{1}".format(unique_database, table_name))
|
|
|
|
result = self._run_query_with_client(grant_table_stmt
|
|
.format(privilege, unique_database, table_name, grantee_type, grantee),
|
|
user_client, False)
|
|
assert ERROR_GRANT in str(result)
|
|
result = self._run_query_with_client(revoke_table_stmt
|
|
.format(privilege, unique_database, table_name, grantee_type, grantee),
|
|
user_client, False)
|
|
assert ERROR_REVOKE in str(result)
|
|
|
|
# Set the owner of the table to a role that has the same name as
|
|
# 'OWNER_USER' and verify that the user 'OWNER_USER' is not able to grant or
|
|
# revoke a privilege on the table.
|
|
admin_client.execute(set_table_owner_role_stmt
|
|
.format(unique_database, table_name, resource_owner_role))
|
|
|
|
result = self._run_query_with_client(grant_table_stmt
|
|
.format(privilege, unique_database, table_name, grantee_type, grantee),
|
|
user_client, False)
|
|
assert ERROR_GRANT in str(result)
|
|
result = self._run_query_with_client(revoke_table_stmt
|
|
.format(privilege, unique_database, table_name, grantee_type, grantee),
|
|
user_client, False)
|
|
assert ERROR_REVOKE in str(result)
|
|
# Change the table owner back to the user 'OWNER_USER'.
|
|
admin_client.execute(set_table_owner_user_stmt
|
|
.format(unique_database, table_name, OWNER_USER))
|
|
finally:
|
|
# Revoke the privilege that was granted by 'OWNER_USER' in case any of the
|
|
# REVOKE statements submitted by 'OWNER_USER' failed to prevent this test
|
|
# from interfering with other tests.
|
|
admin_client.execute(revoke_table_stmt
|
|
.format(privilege, unique_database, table_name, grantee_type, grantee))
|
|
admin_client.close()
|
|
user_client.close()
|
|
|
|
def _test_grant_revoke_by_owner_on_column(self, privilege, column_names,
|
|
unique_database, table_name, grantee_type, grantee, resource_owner_role):
|
|
# For a column, only the SELECT privilege is allowed.
|
|
if privilege != "select":
|
|
return
|
|
grant_column_stmt = "grant {0} ({1}) on table {2}.{3} to {4} {5}"
|
|
revoke_column_stmt = "revoke {0} ({1}) on table {2}.{3} from {4} {5}"
|
|
show_grant_column_stmt = "show grant {0} {1} on column {2}.{3}.{4}"
|
|
resource_owner_group = OWNER_USER
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
user_client = self.create_impala_client(user=OWNER_USER)
|
|
set_table_owner_user_stmt = "alter table {0}.{1} set owner user {2}"
|
|
set_table_owner_group_stmt = "alter table {0}.{1} set owner group {2}"
|
|
set_table_owner_role_stmt = "alter table {0}.{1} set owner role {2}"
|
|
|
|
try:
|
|
self._run_query_with_client(grant_column_stmt
|
|
.format(privilege, column_names[0], unique_database, table_name,
|
|
grantee_type, grantee), user_client, True)
|
|
result = admin_client.execute(show_grant_column_stmt
|
|
.format(grantee_type, grantee, unique_database, table_name,
|
|
column_names[0]))
|
|
TestRanger._check_privileges(result, [
|
|
[grantee_type, grantee, unique_database, table_name, column_names[0],
|
|
"", "", "", "", privilege, "false"]])
|
|
|
|
self._run_query_with_client(revoke_column_stmt
|
|
.format(privilege, column_names[0], unique_database, table_name,
|
|
grantee_type, grantee), user_client, True)
|
|
result = admin_client.execute(show_grant_column_stmt
|
|
.format(grantee_type, grantee, unique_database, table_name,
|
|
column_names[0]))
|
|
TestRanger._check_privileges(result, [])
|
|
|
|
# Set the owner of the table to a group that has the same name as 'OWNER_USER'
|
|
# and verify that the user 'OWNER_USER' is not able to grant or revoke a
|
|
# privilege on a column in the table.
|
|
self.run_stmt_in_hive(set_table_owner_group_stmt
|
|
.format(unique_database, table_name, resource_owner_group))
|
|
admin_client.execute("refresh {0}.{1}".format(unique_database, table_name))
|
|
|
|
result = self._run_query_with_client(grant_column_stmt
|
|
.format(privilege, column_names[0], unique_database, table_name,
|
|
grantee_type, grantee), user_client, False)
|
|
assert ERROR_GRANT in str(result)
|
|
result = self._run_query_with_client(revoke_column_stmt
|
|
.format(privilege, column_names[0], unique_database, table_name,
|
|
grantee_type, grantee), user_client, False)
|
|
assert ERROR_REVOKE in str(result)
|
|
|
|
# Set the owner of the table to a role that has the same name as 'OWNER_USER' and
|
|
# verify that the user 'OWNER_USER' is not able to grant or revoke a privilege on
|
|
# a column in the table.
|
|
admin_client.execute(set_table_owner_role_stmt
|
|
.format(unique_database, table_name, resource_owner_role))
|
|
|
|
result = self._run_query_with_client(grant_column_stmt
|
|
.format(privilege, column_names[0], unique_database, table_name,
|
|
grantee_type, grantee), user_client, False)
|
|
assert ERROR_GRANT in str(result)
|
|
result = self._run_query_with_client(revoke_column_stmt
|
|
.format(privilege, column_names[0], unique_database, table_name,
|
|
grantee_type, grantee), user_client, False)
|
|
assert ERROR_REVOKE in str(result)
|
|
# Change the table owner back to the user 'owner_user'.
|
|
admin_client.execute(set_table_owner_user_stmt
|
|
.format(unique_database, table_name, OWNER_USER))
|
|
finally:
|
|
# Revoke the privilege that was granted by 'OWNER_USER' in case any of the
|
|
# REVOKE statements submitted by 'OWNER_USER' failed to prevent this test
|
|
# from interfering with other tests.
|
|
admin_client.execute(revoke_column_stmt
|
|
.format(privilege, column_names[0], unique_database, table_name,
|
|
grantee_type, grantee))
|
|
admin_client.close()
|
|
user_client.close()
|
|
|
|
def _test_grant_revoke_by_owner_on_udf(self, privilege, unique_database, udf_name,
|
|
grantee_type, grantee):
|
|
# Due to IMPALA-11743 and IMPALA-12685, the owner of a UDF could not grant
|
|
# or revoke the SELECT privilege.
|
|
with self.create_impala_client(user=OWNER_USER) as user_client:
|
|
result = self._run_query_with_client("grant {0} on user_defined_fn "
|
|
"{1}.{2} to {3} {4}".format(privilege, unique_database, udf_name,
|
|
grantee_type, grantee), user_client, False)
|
|
assert ERROR_GRANT in str(result)
|
|
result = self._run_query_with_client("revoke {0} on user_defined_fn "
|
|
"{1}.{2} from {3} {4}".format(privilege, unique_database, udf_name,
|
|
grantee_type, grantee), user_client, False)
|
|
assert ERROR_REVOKE in str(result)
|
|
|
|
def _test_allow_catalog_cache_op_from_masked_users(self, unique_name):
|
|
"""Verify that catalog cache operations are allowed for masked users
|
|
when allow_catalog_cache_op_from_masked_users=true."""
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
non_admin_client = self.create_impala_client(user=user)
|
|
try:
|
|
# Create a column masking policy on 'user' which is also the owner of the table
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name, user, "functional", "alltypestiny", "id",
|
|
"CUSTOM", "id * 100")
|
|
|
|
# At a cold start, the table is unloaded so its owner is unknown.
|
|
# INVALIDATE METADATA <table> is denied since 'user' is not detected as the owner.
|
|
result = self.execute_query_expect_failure(
|
|
non_admin_client, "invalidate metadata functional.alltypestiny")
|
|
assert "User '{0}' does not have privileges to execute " \
|
|
"'INVALIDATE METADATA/REFRESH' on: functional.alltypestiny".format(user) \
|
|
in str(result)
|
|
# Verify catalogd never loads metadata of this table
|
|
table_loaded_log = "Loaded metadata for: functional.alltypestiny"
|
|
self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=0)
|
|
|
|
# Run a query to trigger metadata loading on the table
|
|
self.execute_query_expect_success(
|
|
non_admin_client, "describe functional.alltypestiny")
|
|
# Verify catalogd loads metadata of this table
|
|
self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=1)
|
|
|
|
# INVALIDATE METADATA <table> is allowed since 'user' is detected as the owner.
|
|
self.execute_query_expect_success(
|
|
non_admin_client, "invalidate metadata functional.alltypestiny")
|
|
|
|
# Run a query to trigger metadata loading on the table
|
|
self.execute_query_expect_success(
|
|
non_admin_client, "describe functional.alltypestiny")
|
|
# Verify catalogd loads metadata of this table
|
|
self.assert_catalogd_log_contains("INFO", table_loaded_log, expected_count=2)
|
|
|
|
# Verify REFRESH <table> is allowed since 'user' is detected as the owner.
|
|
self.execute_query_expect_success(
|
|
non_admin_client, "refresh functional.alltypestiny")
|
|
self.execute_query_expect_success(
|
|
non_admin_client,
|
|
"refresh functional.alltypestiny partition(year=2009, month=1)")
|
|
|
|
# Clear the catalog cache and grant 'user' enough privileges
|
|
self.execute_query_expect_success(
|
|
admin_client, "invalidate metadata functional.alltypestiny")
|
|
admin_client.execute("grant refresh on table functional.alltypestiny to user {0}"
|
|
.format(user))
|
|
try:
|
|
# Now 'user' should be able to run REFRESH/INVALIDATE even if the table is
|
|
# unloaded (not recognize it as the owner).
|
|
self.execute_query_expect_success(
|
|
non_admin_client, "invalidate metadata functional.alltypestiny")
|
|
self.execute_query_expect_success(
|
|
non_admin_client, "refresh functional.alltypestiny")
|
|
finally:
|
|
admin_client.execute(
|
|
"revoke refresh on table functional.alltypestiny from user {0}".format(user))
|
|
finally:
|
|
TestRanger._remove_policy(unique_name)
|
|
|
|
def _test_no_exception_in_show_roles_if_no_roles_in_ranger(self):
|
|
"""
|
|
Ensure that no exception should throw for show roles statement
|
|
if there are no roles in ranger.
|
|
"""
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
show_roles_statements = [
|
|
"SHOW ROLES",
|
|
"SHOW CURRENT ROLES",
|
|
"SHOW ROLE GRANT GROUP admin"
|
|
]
|
|
for statement in show_roles_statements:
|
|
result = self.execute_query_expect_success(admin_client, statement)
|
|
assert len(result.data) == 0
|
|
|
|
def _test_ownership(self):
|
|
"""Tests ownership privileges for databases and tables with ranger along with
|
|
some known quirks in the implementation."""
|
|
test_user = getuser()
|
|
test_role = 'test_role'
|
|
test_db = "test_ranger_ownership_" + get_random_id(5).lower()
|
|
# Create a test database as "admin" user. Owner is set accordingly.
|
|
self._run_query_as_user("create database {0}".format(test_db), ADMIN, True)
|
|
try:
|
|
# Try to create a table under test_db as current user. It should fail.
|
|
self._run_query_as_user(
|
|
"create table {0}.foo(a int)".format(test_db), test_user, False)
|
|
|
|
# Change the owner of the database to the current user.
|
|
self._run_query_as_user(
|
|
"alter database {0} set owner user {1}".format(test_db, test_user), ADMIN, True)
|
|
|
|
self._run_query_as_user("refresh authorization", ADMIN, True)
|
|
|
|
# Create should succeed now.
|
|
self._run_query_as_user(
|
|
"create table {0}.foo(a int)".format(test_db), test_user, True)
|
|
# Run show tables on the db. The resulting list should be empty. This happens
|
|
# because the created table's ownership information is not aggressively cached
|
|
# by the current Catalog implementations. Hence the analysis pass does not
|
|
# have access to the ownership information to verify if the current session
|
|
# user is actually the owner. We need to fix this by caching the HMS metadata
|
|
# more aggressively when the table loads. TODO(IMPALA-8937).
|
|
result = \
|
|
self._run_query_as_user("show tables in {0}".format(test_db), test_user, True)
|
|
assert len(result.data) == 0
|
|
# Run a simple query that warms up the table metadata and repeat SHOW TABLES.
|
|
self._run_query_as_user("select * from {0}.foo".format(test_db), test_user, True)
|
|
result = \
|
|
self._run_query_as_user("show tables in {0}".format(test_db), test_user, True)
|
|
assert len(result.data) == 1
|
|
assert "foo" in result.data
|
|
# Change the owner of the db back to the admin user
|
|
self._run_query_as_user(
|
|
"alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN, True)
|
|
result = self._run_query_as_user(
|
|
"show tables in {0}".format(test_db), test_user, False)
|
|
err = "User '{0}' does not have privileges to access: {1}.*.*". \
|
|
format(test_user, test_db)
|
|
assert err in str(result)
|
|
# test_user is still the owner of the table, so select should work fine.
|
|
self._run_query_as_user("select * from {0}.foo".format(test_db), test_user, True)
|
|
# Change the table owner back to admin.
|
|
self._run_query_as_user(
|
|
"alter table {0}.foo set owner user {1}".format(test_db, ADMIN), ADMIN, True)
|
|
# create role before test begin.
|
|
self._run_query_as_user("CREATE ROLE {0}".format(test_role), ADMIN, True)
|
|
# test alter table owner to role statement, expect success result.
|
|
stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role)
|
|
self._run_query_as_user(stmt, ADMIN, True)
|
|
# drop the role.
|
|
self._run_query_as_user("DROP ROLE {0}".format(test_role), ADMIN, True)
|
|
# alter table to a non-exist role, expect error showing "role doesn't exist".
|
|
stmt = "alter table {0}.foo set owner role {1}".format(test_db, test_role)
|
|
result = self._run_query_as_user(stmt, ADMIN, False)
|
|
err = "Role '{0}' does not exist.".format(test_role)
|
|
assert err in str(result)
|
|
# test_user should not be authorized to run the queries anymore.
|
|
result = self._run_query_as_user(
|
|
"select * from {0}.foo".format(test_db), test_user, False)
|
|
err = ("AuthorizationException: User '{0}' does not have privileges to execute"
|
|
+ " 'SELECT' on: {1}.foo").format(test_user, test_db)
|
|
assert err in str(result)
|
|
finally:
|
|
self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True)
|
|
|
|
def _verified_multiuser_results(self, admin_client, admin_query_tmpl, user_query, users,
|
|
user_clients):
|
|
assert len(users) == len(user_clients)
|
|
for i in range(len(users)):
|
|
admin_res = admin_client.execute(admin_query_tmpl % ("'%s'" % users[i])).get_data()
|
|
user_res = user_clients[i].execute(user_query).get_data()
|
|
assert admin_res == user_res
|
|
|
|
def _test_select_view_created_by_non_superuser(self, unique_name):
|
|
"""Test that except for the necessary privileges on the view, the requesting user
|
|
has to be granted the necessary privileges on the underlying tables as well in order
|
|
to access a view with its table property of 'Authorized' set to false."""
|
|
grantee_user = "non_owner"
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
non_owner_client = self.create_impala_client(user=grantee_user)
|
|
unique_database = unique_name + "_db"
|
|
test_tbl_1 = unique_name + "_tbl_1"
|
|
test_tbl_2 = unique_name + "_tbl_2"
|
|
test_view = unique_name + "_view"
|
|
|
|
try:
|
|
# Set up temp database, tables, and the view.
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)"
|
|
.format(unique_database, test_tbl_1))
|
|
admin_client.execute("create table {0}.{1} (id int, string_col string)"
|
|
.format(unique_database, test_tbl_2))
|
|
admin_client.execute("create view {0}.{1} (abc, xyz) as "
|
|
"select count(a.bigint_col), b.string_col "
|
|
"from {0}.{2} a inner join {0}.{3} b on a.id = b.id "
|
|
"group by b.string_col having count(a.bigint_col) > 1"
|
|
.format(unique_database, test_view, test_tbl_1, test_tbl_2))
|
|
# Add this table property to simulate a view created by a non-superuser.
|
|
self.run_stmt_in_hive("alter view {0}.{1} "
|
|
"set tblproperties ('Authorized' = 'false')"
|
|
.format(unique_database, test_view))
|
|
|
|
admin_client.execute("grant select(abc) on table {0}.{1} to user {2}"
|
|
.format(unique_database, test_view, grantee_user))
|
|
admin_client.execute("grant select(xyz) on table {0}.{1} to user {2}"
|
|
.format(unique_database, test_view, grantee_user))
|
|
admin_client.execute("grant select on table {0}.{1} to user {2}"
|
|
.format(unique_database, test_tbl_2, grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
|
|
result = self.execute_query_expect_failure(non_owner_client,
|
|
"select * from {0}.{1}".format(unique_database, test_view))
|
|
assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
|
|
"{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result)
|
|
|
|
admin_client.execute("grant select(id) on table {0}.{1} to user {2}"
|
|
.format(unique_database, test_tbl_1, grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
# The query is not authorized since the SELECT privilege on the column 'bigint_col'
|
|
# in the table 'test_tbl_1' is also required.
|
|
result = self.execute_query_expect_failure(non_owner_client,
|
|
"select * from {0}.{1}".format(unique_database, test_view))
|
|
assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
|
|
"{1}.{2}".format(grantee_user, unique_database, test_tbl_1) in str(result)
|
|
|
|
admin_client.execute("grant select(bigint_col) on table {0}.{1} to user {2}"
|
|
.format(unique_database, test_tbl_1, grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
|
|
# The query is authorized successfully once sufficient privileges are granted.
|
|
self.execute_query_expect_success(non_owner_client,
|
|
"select * from {0}.{1}".format(unique_database, test_view))
|
|
|
|
# Add a deny policy to prevent 'grantee_user' from accessing the column 'id' in
|
|
# the table 'test_tbl_2', on which 'grantee_user' had been granted the SELECT
|
|
# privilege.
|
|
TestRanger._add_deny_policy(unique_name, grantee_user, unique_database, test_tbl_2,
|
|
"id")
|
|
admin_client.execute("refresh authorization")
|
|
|
|
# The query is not authorized since the SELECT privilege on the column 'id' in the
|
|
# table 'test_tbl_2' has been denied in the policy above.
|
|
result = self.execute_query_expect_failure(non_owner_client,
|
|
"select * from {0}.{1}".format(unique_database, test_view))
|
|
assert "User '{0}' does not have privileges to execute 'SELECT' on: " \
|
|
"{1}.{2}".format(grantee_user, unique_database, test_tbl_2) in str(result)
|
|
finally:
|
|
admin_client.execute("revoke select(abc) on table {0}.{1} from user {2}"
|
|
.format(unique_database, test_view, grantee_user))
|
|
admin_client.execute("revoke select(xyz) on table {0}.{1} from user {2}"
|
|
.format(unique_database, test_view, grantee_user))
|
|
admin_client.execute("revoke select(id) on table {0}.{1} from user {2}"
|
|
.format(unique_database, test_tbl_1, grantee_user))
|
|
admin_client.execute("revoke select(bigint_col) on table {0}.{1} from user {2}"
|
|
.format(unique_database, test_tbl_1, grantee_user))
|
|
admin_client.execute("revoke select on table {0}.{1} from user {2}"
|
|
.format(unique_database, test_tbl_2, grantee_user))
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
TestRanger._remove_policy(unique_name)
|
|
|
|
def _test_select_calcite_frontend(self, unique_name):
|
|
grantee_user = "non_owner"
|
|
with self.create_impala_client(user=ADMIN) as admin_client, \
|
|
self.create_impala_client(user=grantee_user) as non_owner_client:
|
|
# Set the query option of 'use_calcite_planner' to 1 to use the Calcite planner.
|
|
non_owner_client.set_configuration({"use_calcite_planner": 1})
|
|
unique_database = unique_name + "_db"
|
|
unique_table = unique_name + "_tbl"
|
|
test_select_query = "select * from {0}.{1}".format(unique_database, unique_table)
|
|
error_msg_prefix = "User '{0}' does not have privileges to execute 'SELECT' on:" \
|
|
.format(grantee_user)
|
|
try:
|
|
# Set up a temporary database and a table.
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0}.{1} (id int, bigint_col bigint)"
|
|
.format(unique_database, unique_table))
|
|
admin_client.execute("refresh authorization")
|
|
|
|
admin_client.execute("grant select (id) on table {0}.{1} to user {2}"
|
|
.format(unique_database, unique_table, grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
|
|
# Even though 'grantee_user' was already granted the SELECT privilege on the
|
|
# column 'id', 'grantee_user' still could not execute the query because the user
|
|
# does not have the SELECT privilege on the table, or the SELECT privilege on the
|
|
# column 'bigint_col'.
|
|
result = self.execute_query_expect_failure(non_owner_client, test_select_query)
|
|
assert "{0} {1}.{2}" \
|
|
.format(error_msg_prefix, unique_database, unique_table) in str(result)
|
|
|
|
admin_client.execute("grant select (bigint_col) on table {0}.{1} to user {2}"
|
|
.format(unique_database, unique_table, grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
|
|
# After 'grantee_user' is granted the SELECT privilege on the column
|
|
# 'bigint_col', the query could be executed.
|
|
non_owner_client.execute(test_select_query)
|
|
finally:
|
|
admin_client.execute("revoke select (id) on table {0}.{1} from user {2}"
|
|
.format(unique_database, unique_table, grantee_user))
|
|
admin_client.execute("revoke select (bigint_col) on table {0}.{1} from user {2}"
|
|
.format(unique_database, unique_table, grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
|
|
def _test_view_on_view(self, use_calcite_planner,
|
|
v2_created_by_non_superuser, v1_created_by_non_superuser,
|
|
v2_name, v1_name,
|
|
# This denotes the columns on which the requesting user has to have the SELECT
|
|
# privilege in order to execute the SELECT query against the view v2.
|
|
priv_req_columns,
|
|
# This denotes the tables for which a frontend registers the masked privilege
|
|
# requests.
|
|
priv_req_masked_tables,
|
|
# This denotes the columns for which a frontend registers the masked privilege
|
|
# requests.
|
|
priv_req_masked_columns):
|
|
grantee_user = "non_owner"
|
|
with self.create_impala_client(user=ADMIN) as admin_client, \
|
|
self.create_impala_client(user=grantee_user) as non_owner_client:
|
|
if use_calcite_planner is True:
|
|
# Set the query option of 'use_calcite_planner' to 1 to use the Calcite planner.
|
|
non_owner_client.set_configuration({"use_calcite_planner": 1})
|
|
test_select_query = "select * from {0}".format(v2_name)
|
|
select_error_prefix = "User '{0}' does not have privileges to execute " \
|
|
"'SELECT' on:".format(grantee_user)
|
|
profile_error = "User {0} is not authorized to access the runtime profile " \
|
|
"or execution summary.".format(grantee_user)
|
|
try:
|
|
# Add the table property to simulate views created by a non-superuser.
|
|
if v2_created_by_non_superuser is True:
|
|
self.run_stmt_in_hive("alter view {0} set tblproperties "
|
|
"('Authorized' = 'false')".format(v2_name))
|
|
admin_client.execute("invalidate metadata {0}".format(v2_name))
|
|
if v1_created_by_non_superuser is True:
|
|
self.run_stmt_in_hive("alter view {0} set tblproperties "
|
|
"('Authorized' = 'false')".format(v1_name))
|
|
admin_client.execute("invalidate metadata {0}".format(v1_name))
|
|
|
|
result = self.execute_query_expect_failure(non_owner_client, test_select_query)
|
|
assert "{0} {1}" \
|
|
.format(select_error_prefix, v2_name) in str(result)
|
|
|
|
for column in priv_req_columns:
|
|
admin_client.execute("grant select ({0}) on table {1} to user {2}"
|
|
.format(column[1], column[0], grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
# Recall that by default the Impala client would always attempt to retrieve the
|
|
# runtime profile after query execution. The following verifies except for the
|
|
# case in which both views were created by a non-superuser, 'grantee_user' will
|
|
# not be able to retrieve the runtime profile even though the SELECT query could
|
|
# be executed. Note that no masked privilege request would be registered when
|
|
# both views were created by a non-superuser.
|
|
if v2_created_by_non_superuser is False or v1_created_by_non_superuser is False:
|
|
result = self.execute_query_expect_failure(non_owner_client, test_select_query)
|
|
assert profile_error in str(result)
|
|
else:
|
|
non_owner_client.execute(test_select_query)
|
|
|
|
# Once we grant to 'grantee_user' the privileges on the tables and columns for
|
|
# which the masked privilege requests were registered, query could be executed
|
|
# and the runtime profile could be retrieved.
|
|
for table in priv_req_masked_tables:
|
|
admin_client.execute("grant select on table {0} to user {1}"
|
|
.format(table, grantee_user))
|
|
for column in priv_req_masked_columns:
|
|
admin_client.execute("grant select ({0}) on table {1} to user {2}"
|
|
.format(column[1], column[0], grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
non_owner_client.execute(test_select_query)
|
|
finally:
|
|
for column in priv_req_columns:
|
|
admin_client.execute("revoke select ({0}) on table {1} from user {2}"
|
|
.format(column[1], column[0], grantee_user))
|
|
if v2_created_by_non_superuser is True:
|
|
self.run_stmt_in_hive("alter view {0} unset tblproperties ('Authorized')"
|
|
.format(v2_name))
|
|
admin_client.execute("invalidate metadata {0}".format(v2_name))
|
|
if v1_created_by_non_superuser is True:
|
|
self.run_stmt_in_hive("alter view {0} unset tblproperties ('Authorized')"
|
|
.format(v1_name))
|
|
admin_client.execute("invalidate metadata {0}".format(v1_name))
|
|
for table in priv_req_masked_tables:
|
|
admin_client.execute("revoke select on table {0} from user {1}"
|
|
.format(table, grantee_user))
|
|
for column in priv_req_masked_columns:
|
|
admin_client.execute("revoke select ({0}) on table {1} from user {2}"
|
|
.format(column[1], column[0], grantee_user))
|
|
admin_client.execute("refresh authorization")
|
|
|
|
def _test_view_on_view_all_configs(self, unique_name):
|
|
"""
|
|
This verifies 4 possible combinations of the table property of 'Authorized' when a
|
|
view (v2) is defined on top of another view (v1) for both the classic planner and the
|
|
Calcite planner.
|
|
The expected behavior could be summarized as follows.
|
|
1. In order to execute the SELECT query against v2, the requesting user always has
|
|
to have the SELECT privilege on v2, or the SELECT privilege on the selected column
|
|
in v2.
|
|
2. For a view v that was created by a superuser, i.e., v does not have the table
|
|
property of 'Authorized' set to "false", privilege requests for the underlying
|
|
tables, views, and the columns of v would be registered as the masked ones,
|
|
meaning that the requesting user does not have to be granted the privileges on
|
|
those referenced resources by v to execute a query against v.
|
|
3. However, if there is any registered masked privilege request for any table or
|
|
column on which the requesting user does not have the privilege, the requesting
|
|
user will not be able to retrieve the runtime profile.
|
|
"""
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_database = unique_name + "_db"
|
|
unique_table = unique_database + "." + unique_name + "_tbl"
|
|
unique_view_1 = unique_database + "." + unique_name + "_v1"
|
|
unique_view_2 = unique_database + "." + unique_name + "_v2"
|
|
planner_options = [True, False]
|
|
try:
|
|
# Set up a temporary database, a table, and 2 views.
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0} (id int)".format(unique_table))
|
|
admin_client.execute("create view {0} as select * from {1}"
|
|
.format(unique_view_1, unique_table))
|
|
admin_client.execute("create view {0} as select * from {1}"
|
|
.format(unique_view_2, unique_view_1))
|
|
|
|
for use_calcite_planner in planner_options:
|
|
self._test_view_on_view(use_calcite_planner, False, False,
|
|
unique_view_2, unique_view_1,
|
|
[(unique_view_2, "id")],
|
|
[unique_view_1, unique_table],
|
|
[(unique_view_1, "id"), (unique_table, "id")])
|
|
|
|
self._test_view_on_view(use_calcite_planner, False, True,
|
|
unique_view_2, unique_view_1,
|
|
[(unique_view_2, "id")],
|
|
[unique_view_1, unique_table],
|
|
[(unique_view_1, "id"), (unique_table, "id")])
|
|
|
|
self._test_view_on_view(use_calcite_planner, True, False,
|
|
unique_view_2, unique_view_1,
|
|
[(unique_view_2, "id"), (unique_view_1, "id")],
|
|
[unique_table],
|
|
[(unique_table, "id")])
|
|
|
|
self._test_view_on_view(use_calcite_planner, True, True,
|
|
unique_view_2, unique_view_1,
|
|
[(unique_view_2, "id"), (unique_view_1, "id"), (unique_table, "id")],
|
|
[],
|
|
[])
|
|
|
|
finally:
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
|
|
def _test_table_masking_calcite_frontend(self, unique_name):
|
|
"""
|
|
This verifies table masking is not yet supported by the Calcite planner.
|
|
"""
|
|
grantee_user = "non_owner"
|
|
with self.create_impala_client(user=ADMIN) as admin_client, \
|
|
self.create_impala_client(user=grantee_user) as non_owner_client:
|
|
non_owner_client.set_configuration({"use_calcite_planner": 1})
|
|
database = "functional"
|
|
table_1 = "alltypes"
|
|
table_2 = "alltypestiny"
|
|
test_select_query_1 = "select id from {0}.{1}".format(database, table_1)
|
|
test_select_query_2 = "select id from {0}.{1}".format(database, table_2)
|
|
|
|
policy_cnt = 0
|
|
try:
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), grantee_user, database, table_1, "id",
|
|
"CUSTOM", "id * 100")
|
|
policy_cnt += 1
|
|
|
|
admin_client.execute("grant select (id) on table {0}.{1} to user {2}"
|
|
.format(database, table_1, grantee_user))
|
|
result = self.execute_query_expect_success(non_owner_client, test_select_query_1)
|
|
assert "UnsupportedFeatureException" in str(result.runtime_profile)
|
|
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), grantee_user, database, table_2, "id % 2 = 0")
|
|
policy_cnt += 1
|
|
|
|
admin_client.execute("grant select (id) on table {0}.{1} to user {2}"
|
|
.format(database, table_2, grantee_user))
|
|
result = self.execute_query_expect_success(non_owner_client, test_select_query_2)
|
|
assert "UnsupportedFeatureException" in str(result.runtime_profile)
|
|
finally:
|
|
admin_client.execute("revoke select (id) on table {0}.{1} from user {2}"
|
|
.format(database, table_1, grantee_user))
|
|
admin_client.execute("revoke select (id) on table {0}.{1} from user {2}"
|
|
.format(database, table_2, grantee_user))
|
|
for i in range(policy_cnt):
|
|
TestRanger._remove_policy(unique_name + str(i))
|
|
|
|
|
|
class TestRangerIndependent(TestRanger):
|
|
"""
|
|
Tests for Apache Ranger integration with Apache Impala that require cluster restart
|
|
between test method.
|
|
"""
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impala_log_dir=tempfile.mkdtemp(prefix="ranger_audit_xff", dir=os.getenv("LOG_DIR")),
|
|
impalad_args=(IMPALAD_ARGS + " --use_xff_address_as_origin=true"),
|
|
catalogd_args=CATALOGD_ARGS,
|
|
disable_log_buffering=True)
|
|
def test_xff_ranger_audit(self):
|
|
"""
|
|
Tests XFF client IP is included in ranger audit logs when using hs2-http protocol
|
|
"""
|
|
# Iterate over test vector within test function to avoid restarting cluster.
|
|
for vector in\
|
|
[ImpalaTestVector([value]) for value in create_client_protocol_dimension()]:
|
|
protocol = vector.get_value("protocol")
|
|
if protocol != "hs2-http":
|
|
continue
|
|
|
|
# Query with XFF header in client request
|
|
args = ['--protocol=hs2-http',
|
|
'--hs2_x_forward= 10.20.30.40 ',
|
|
'-q', 'select count(1) from functional.alltypes']
|
|
run_impala_shell_cmd(vector, args)
|
|
|
|
# Query with XFF header in client request
|
|
args = ['--protocol=hs2-http',
|
|
'--hs2_x_forward=10.20.30.40, 1.1.2.3, 127.0.0.6',
|
|
'-q', 'select count(1) from functional.alltypes']
|
|
run_impala_shell_cmd(vector, args)
|
|
|
|
# Query with XFF header in client request
|
|
args = ['--protocol=hs2-http',
|
|
'--hs2_x_forward=10.20.30.40,1.1.2.3,127.0.0.6',
|
|
'-q', 'select count(1) from functional.alltypes']
|
|
run_impala_shell_cmd(vector, args)
|
|
|
|
# Query without XFF header in client request
|
|
args = ['--protocol=hs2-http',
|
|
'-q', 'select count(2) from functional.alltypes']
|
|
run_impala_shell_cmd(vector, args)
|
|
|
|
# Query with empty XFF header in client request
|
|
args = ['--protocol=hs2-http',
|
|
'--hs2_x_forward= ',
|
|
'-q', 'select count(2) from functional.alltypes']
|
|
run_impala_shell_cmd(vector, args)
|
|
|
|
# Query with invalid XFF header in client request
|
|
args = ['--protocol=hs2-http',
|
|
'--hs2_x_forward=foobar',
|
|
'-q', 'select count(3) from functional.alltypes']
|
|
run_impala_shell_cmd(vector, args)
|
|
|
|
# Shut down cluster to ensure logs flush to disk.
|
|
sleep(5)
|
|
self._stop_impala_cluster()
|
|
|
|
# Expected audit log string
|
|
expected_string_valid_xff = (
|
|
'"access":"select",'
|
|
'"resource":"functional/alltypes",'
|
|
'"resType":"@table",'
|
|
'"action":"select",'
|
|
'"result":1,'
|
|
'"agent":"impala",'
|
|
r'"policy":\d,'
|
|
'"enforcer":"ranger-acl",'
|
|
'"cliIP":"%s",'
|
|
'"reqData":"%s",'
|
|
'".+":".+","logType":"RangerAudit"'
|
|
)
|
|
|
|
# Ensure audit logs were logged in coordinator logs
|
|
self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
|
|
("10.20.30.40", r"select count\(1\) from functional.alltypes"),
|
|
expected_count=3)
|
|
self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
|
|
("127.0.0.1", r"select count\(2\) from functional.alltypes"), expected_count=2)
|
|
self.assert_impalad_log_contains("INFO", expected_string_valid_xff %
|
|
("foobar", r"select count\(3\) from functional.alltypes"), expected_count=1)
|
|
|
|
@pytest.mark.execute_serially
|
|
@SkipIf.is_test_jdk
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True)
|
|
def test_show_grant_hive_privilege(self, unique_name):
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_db = unique_name + "_db"
|
|
resource = {
|
|
"database": unique_db,
|
|
"column": "*",
|
|
"table": "*"
|
|
}
|
|
access = ["lock", "select"]
|
|
|
|
try:
|
|
TestRanger._grant_ranger_privilege(user, resource, access)
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_db))
|
|
admin_client.execute("create database {0}".format(unique_db))
|
|
|
|
admin_client.execute("refresh authorization")
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"]
|
|
])
|
|
|
|
# Assert that lock, select privilege exists in Ranger server
|
|
assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db)
|
|
assert "select" in TestRanger._get_ranger_privileges_db(user, unique_db)
|
|
|
|
admin_client.execute("revoke select on database {0} from user {1}"
|
|
.format(unique_db, user))
|
|
|
|
# Assert that lock is still present and select is revoked in Ranger server
|
|
assert "lock" in TestRanger._get_ranger_privileges_db(user, unique_db)
|
|
assert "select" not in TestRanger._get_ranger_privileges_db(user, unique_db)
|
|
|
|
admin_client.execute("refresh authorization")
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
|
|
TestRanger._check_privileges(result, [])
|
|
finally:
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_db))
|
|
TestRanger._revoke_ranger_privilege(user, resource, access)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS, reset_ranger=True)
|
|
def test_grant_multiple_columns(self):
|
|
self._test_grant_multiple_columns(13)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=IMPALAD_ARGS,
|
|
catalogd_args="{0} {1}".format(CATALOGD_ARGS, "--consolidate_grant_revoke_requests"),
|
|
reset_ranger=True)
|
|
def test_grant_multiple_columns_consolidate_grant_revoke_requests(self):
|
|
self._test_grant_multiple_columns(1)
|
|
|
|
@SkipIfFS.hive
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=LEGACY_CATALOG_IMPALAD_ARGS,
|
|
catalogd_args=LEGACY_CATALOG_CATALOGD_ARGS + " --hms_event_polling_interval_s=5")
|
|
def test_alter_owner_hms_event_sync(self, unique_name):
|
|
"""Test Impala queries that depends on database ownership changes in Hive.
|
|
Use a longer polling interval to mimic lag in event processing."""
|
|
test_user = getuser()
|
|
test_db = unique_name + "_db"
|
|
# A client that only used by 'test_user'. Just need to set the username at the
|
|
# first statement. It will keep using the same username.
|
|
user_client = self.create_impala_client(user=test_user)
|
|
user_client.set_configuration({"sync_hms_events_wait_time_s": 10,
|
|
"sync_hms_events_strict_mode": True})
|
|
|
|
# Methods to change ownership in Hive which will generate ALTER_DATABASE events.
|
|
def change_db_owner_to_user():
|
|
self.run_stmt_in_hive(
|
|
"alter database {0} set owner user {1}".format(test_db, test_user), ADMIN)
|
|
|
|
def reset_db_owner_to_admin():
|
|
self.run_stmt_in_hive(
|
|
"alter database {0} set owner user {1}".format(test_db, ADMIN), ADMIN)
|
|
|
|
# Create a test database as "admin" user. Owner is set accordingly.
|
|
# By default, only the "admin" user and owner of the db can read/write this db.
|
|
self._run_query_as_user(
|
|
"drop database if exists {0} cascade".format(test_db), ADMIN, expect_success=True)
|
|
self._run_query_as_user(
|
|
"create database {0}".format(test_db), ADMIN, expect_success=True)
|
|
try:
|
|
# Test table statement waits for db alter owner events
|
|
# Try to create a table under test_db as current user. It should fail since
|
|
# test_user is not the db owner.
|
|
create_tbl_stmt = "create table {0}.foo(a int)".format(test_db)
|
|
self.execute_query_expect_failure(user_client, create_tbl_stmt)
|
|
change_db_owner_to_user()
|
|
# Creating the table again should succeed once the ALTER_DATABASE event is synced.
|
|
self.execute_query_expect_success(user_client, create_tbl_stmt)
|
|
reset_db_owner_to_admin()
|
|
|
|
# Test table statement waits for table alter owner events
|
|
stmts = [
|
|
"describe {}.foo".format(test_db),
|
|
"insert into {}.foo values (0)".format(test_db),
|
|
"select * from {}.foo".format(test_db),
|
|
"compute stats {}.foo".format(test_db),
|
|
"refresh {}.foo".format(test_db),
|
|
"drop table {}.foo".format(test_db),
|
|
]
|
|
for stmt in stmts:
|
|
# Change table owner to admin
|
|
self.run_stmt_in_hive(
|
|
"alter table {0}.foo set owner user {1}".format(test_db, ADMIN), ADMIN)
|
|
self.execute_query_expect_failure(user_client, stmt)
|
|
# Change table owner to user
|
|
self.run_stmt_in_hive(
|
|
"alter table {0}.foo set owner user {1}".format(test_db, test_user), ADMIN)
|
|
self.execute_query_expect_success(user_client, stmt)
|
|
# Create the table again since the last statement is DROP TABLE.
|
|
change_db_owner_to_user()
|
|
self.execute_query_expect_success(user_client, create_tbl_stmt)
|
|
reset_db_owner_to_admin()
|
|
|
|
# Test SHOW DATABASES waits for db change owner events. The db is invisible if user
|
|
# is not the owner.
|
|
assert test_db not in self.all_db_names(user_client)
|
|
change_db_owner_to_user()
|
|
assert test_db in self.all_db_names(user_client)
|
|
reset_db_owner_to_admin()
|
|
|
|
# Test SHOW TABLES
|
|
# Run a query on the table to make its ownership info loaded. Otherwise, it will
|
|
# be missing in SHOW TABLES due to IMPALA-8937.
|
|
self.execute_query_expect_success(user_client, "describe {}.foo".format(test_db))
|
|
# SHOW TABLES should fail since user is not the owner of this db
|
|
self.execute_query_expect_failure(user_client, "show tables in " + test_db)
|
|
change_db_owner_to_user()
|
|
# IMPALA-8937: In local catalog mode, table ownership info is not reloaded
|
|
# automatically, and this assert will fail.
|
|
assert ["foo"] == self.all_table_names(user_client, test_db)
|
|
reset_db_owner_to_admin()
|
|
finally:
|
|
self._run_query_as_user("drop database {0} cascade".format(test_db), ADMIN, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="{0} {1}".format(LEGACY_CATALOG_IMPALAD_ARGS,
|
|
"--allow_catalog_cache_op_from_masked_users=true"),
|
|
catalogd_args=LEGACY_CATALOG_CATALOGD_ARGS,
|
|
disable_log_buffering=True)
|
|
def test_allow_metadata_update_legacy_catalog(self, unique_name):
|
|
self._test_allow_catalog_cache_op_from_masked_users(unique_name)
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args="{0} {1}".format(IMPALAD_ARGS,
|
|
"--allow_catalog_cache_op_from_masked_users=true"),
|
|
catalogd_args=CATALOGD_ARGS,
|
|
disable_log_buffering=True)
|
|
def test_allow_metadata_update_local_catalog(self, unique_name):
|
|
self._test_allow_catalog_cache_op_from_masked_users(unique_name)
|
|
|
|
@pytest.mark.execute_serially
|
|
@SkipIf.is_test_jdk
|
|
@SkipIfFS.hive
|
|
@SkipIfHive2.ranger_auth
|
|
@CustomClusterTestSuite.with_args()
|
|
def test_hive_with_ranger_setup(self, vector):
|
|
"""Test for setup of Hive-Ranger integration. Make sure future upgrades on
|
|
Hive/Ranger won't break the tool."""
|
|
script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/run-hive-server.sh')
|
|
try:
|
|
# Add the policy before restarting Hive. So it can take effect immediately after
|
|
# HiveServer2 starts.
|
|
TestRanger._add_column_masking_policy(
|
|
"col_mask_for_hive", getuser(), "functional", "alltypestiny", "id", "CUSTOM",
|
|
"{col} * 100")
|
|
check_call([script, '-with_ranger'])
|
|
self.run_test_case("QueryTest/hive_ranger_integration", vector)
|
|
finally:
|
|
check_call([script])
|
|
TestRanger._remove_policy("col_mask_for_hive")
|
|
|
|
@pytest.mark.execute_serially
|
|
@SkipIfFS.incorrent_reported_ec
|
|
@CustomClusterTestSuite.with_args(
|
|
# We additionally provide impalad and catalogd with the customized user-to-groups
|
|
# mapper since some test cases in grant_revoke.test require Impala to retrieve the
|
|
# groups a given user belongs to and such users might not exist in the underlying
|
|
# OS in the testing environment, e.g., the user 'non_owner'.
|
|
impalad_args="{0} {1}".format(IMPALAD_ARGS,
|
|
"--use_customized_user_groups_mapper_for_ranger"),
|
|
catalogd_args="{0} {1}".format(CATALOGD_ARGS,
|
|
"--use_customized_user_groups_mapper_for_ranger"))
|
|
def test_grant_revoke_with_role(self, vector):
|
|
"""Test grant/revoke with role."""
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
try:
|
|
self.run_test_case('QueryTest/grant_revoke', vector, use_db="default")
|
|
finally:
|
|
# Below are the statements that need to be executed in order to clean up the
|
|
# privileges granted to the test roles as well as the test roles themselves.
|
|
# Note that we need to revoke those previously granted privileges so that each role
|
|
# is not referenced by any policy before we delete those roles.
|
|
# Moreover, we need to revoke the privilege on the database 'grant_rev_db' before
|
|
# dropping 'grant_rev_db'. Otherwise, the revocation would fail due to an
|
|
# AnalysisException thrown because 'grant_rev_db' does not exist.
|
|
cleanup_statements = [
|
|
"revoke all on database grant_rev_db from grant_revoke_test_ALL_TEST_DB",
|
|
"revoke all on server from grant_revoke_test_ALL_SERVER",
|
|
"revoke all on table functional.alltypes from grant_revoke_test_NON_OWNER",
|
|
"revoke grant option for all on database functional "
|
|
"from grant_revoke_test_NON_OWNER",
|
|
"REVOKE SELECT (a, b, c, d, e, x, y) ON TABLE grant_rev_db.test_tbl3 "
|
|
"FROM grant_revoke_test_ALL_SERVER",
|
|
"REVOKE ALL ON DATABASE functional FROM grant_revoke_test_NON_OWNER",
|
|
"REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 FROM grant_revoke_test_NON_OWNER",
|
|
"REVOKE GRANT OPTION FOR SELECT (a, c) ON TABLE grant_rev_db.test_tbl3 "
|
|
"FROM grant_revoke_test_ALL_SERVER",
|
|
"REVOKE SELECT ON TABLE grant_rev_db.test_tbl1 "
|
|
"FROM grant_revoke_test_SELECT_INSERT_TEST_TBL",
|
|
"REVOKE INSERT ON TABLE grant_rev_db.test_tbl1 "
|
|
"FROM grant_revoke_test_SELECT_INSERT_TEST_TBL",
|
|
"REVOKE SELECT ON TABLE grant_rev_db.test_tbl3 "
|
|
"FROM grant_revoke_test_NON_OWNER",
|
|
"REVOKE SELECT (a) ON TABLE grant_rev_db.test_tbl3 "
|
|
"FROM grant_revoke_test_NON_OWNER",
|
|
"REVOKE SELECT (a, c, e) ON TABLE grant_rev_db.test_tbl3 "
|
|
"FROM grant_revoke_test_ALL_SERVER",
|
|
"revoke all on server server1 from grant_revoke_test_ALL_SERVER1",
|
|
"revoke select(col1) on table grant_rev_db.test_tbl4 "
|
|
"from role grant_revoke_test_COLUMN_PRIV",
|
|
"{0}{1}{2}".format("revoke all on uri '",
|
|
os.getenv("FILESYSTEM_PREFIX"),
|
|
"/test-warehouse/grant_rev_test_tbl2'"
|
|
"from grant_revoke_test_ALL_URI"),
|
|
"{0}{1}{2}".format("revoke all on uri '",
|
|
os.getenv("FILESYSTEM_PREFIX"),
|
|
"/test-warehouse/GRANT_REV_TEST_TBL3'"
|
|
"from grant_revoke_test_ALL_URI"),
|
|
"{0}{1}{2}".format("revoke all on uri '",
|
|
os.getenv("FILESYSTEM_PREFIX"),
|
|
"/test-warehouse/grant_rev_test_prt'"
|
|
"from grant_revoke_test_ALL_URI"),
|
|
"drop role grant_revoke_test_ALL_TEST_DB",
|
|
"drop role grant_revoke_test_ALL_SERVER",
|
|
"drop role grant_revoke_test_SELECT_INSERT_TEST_TBL",
|
|
"drop role grant_revoke_test_ALL_URI",
|
|
"drop role grant_revoke_test_NON_OWNER",
|
|
"drop role grant_revoke_test_ALL_SERVER1",
|
|
"drop role grant_revoke_test_COLUMN_PRIV",
|
|
"drop database grant_rev_db cascade"
|
|
]
|
|
|
|
for statement in cleanup_statements:
|
|
try:
|
|
admin_client.execute(statement)
|
|
except Exception:
|
|
# There could be an exception thrown due to the non-existence of the role or
|
|
# resource involved in a statement that aims to revoke the privilege on a
|
|
# resource from a role, but we do not have to handle such an exception. We only
|
|
# need to make sure in the case when the role and the corresponding resource
|
|
# exist, the granted privilege is revoked. The same applies to the case when we
|
|
# drop a role.
|
|
pass
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=IMPALAD_ARGS,
|
|
catalogd_args=CATALOGD_ARGS,
|
|
# We additionally set 'reset_ranger' to True, to reset all the policies in the
|
|
# Ranger service, so even if there were roles before this test, they will be
|
|
# deleted when this test runs. since the Ranger policies are reset before this
|
|
# test, we do not have to worry about there could be existing roles when the test
|
|
# is running.
|
|
reset_ranger=True)
|
|
def test_no_exception_in_show_roles_if_no_roles_in_ranger(self):
|
|
self._test_no_exception_in_show_roles_if_no_roles_in_ranger()
|
|
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=LEGACY_CATALOG_IMPALAD_ARGS,
|
|
catalogd_args=LEGACY_CATALOG_CATALOGD_ARGS)
|
|
class TestRangerLegacyCatalog(TestRanger):
|
|
"""
|
|
Tests for Apache Ranger integration with Apache Impala in legacy catalog mode.
|
|
Test methods shares common cluster.
|
|
"""
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_grant_revoke_with_catalog_v1(self, unique_name):
|
|
"""Tests grant/revoke with catalog v1."""
|
|
self._test_grant_revoke(unique_name, [None, "invalidate metadata",
|
|
"refresh authorization"])
|
|
|
|
@pytest.mark.execute_serially
|
|
@SkipIfFS.hdfs_acls
|
|
def test_insert_with_catalog_v1(self, unique_name):
|
|
"""
|
|
Test that when Ranger is the authorization provider in the legacy catalog mode,
|
|
Impala does not throw an AnalysisException when an authorized user tries to execute
|
|
an INSERT query against a partitioned table of which the respective table path and
|
|
the partition path are not writable according to HDFS permission.
|
|
"""
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_database = unique_name + "_db"
|
|
unique_table = unique_name + "_tbl"
|
|
partition_column = "year"
|
|
partition_value = "2008"
|
|
table_path = "test-warehouse/{0}.db/{1}".format(unique_database, unique_table)
|
|
table_partition_path = "{0}/{1}={2}"\
|
|
.format(table_path, partition_column, partition_value)
|
|
insert_statement = "insert into {0}.{1} (name) partition ({2}) " \
|
|
"values (\"Adam\", {3})".format(unique_database, unique_table, partition_column,
|
|
partition_value)
|
|
authz_err = "AuthorizationException: User '{0}' does not have privileges to " \
|
|
"execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table)
|
|
try:
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int)"
|
|
.format(unique_database, unique_table, partition_column))
|
|
admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
|
|
.format(unique_database, unique_table, partition_column, partition_value))
|
|
|
|
# Change the owner user and group of the HDFS paths corresponding to the table and
|
|
# the partition so that according to Impala's FsPermissionChecker, the table is not
|
|
# writable to the user that loads the table. This user usually is the one
|
|
# representing the Impala service. Before IMPALA-11871, changing either the table
|
|
# path or the partition path to non-writable would result in an AnalysisException.
|
|
self.hdfs_client.chown(table_path, "another_user", "another_group")
|
|
self.hdfs_client.chown(table_partition_path, "another_user", "another_group")
|
|
# Invalidate the table metadata to force the catalog server to reload the HDFS
|
|
# table and the related partition(s).
|
|
admin_client.execute("invalidate metadata {0}.{1}"
|
|
.format(unique_database, unique_table))
|
|
|
|
# Verify that the INSERT statement fails with AuthorizationException because the
|
|
# requesting user does not have the INSERT privilege on the table.
|
|
result = self._run_query_as_user(insert_statement, user, False)
|
|
assert authz_err in str(result)
|
|
|
|
admin_client.execute("grant insert on table {0}.{1} to user {2}"
|
|
.format(unique_database, unique_table, user))
|
|
# Verify that the INSERT statement succeeds without AnalysisException.
|
|
self._run_query_as_user(insert_statement, user, True)
|
|
finally:
|
|
admin_client.execute("revoke insert on table {0}.{1} from user {2}"
|
|
.format(unique_database, unique_table, user))
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
|
|
@pytest.mark.execute_serially
|
|
@SkipIfFS.hdfs_acls
|
|
def test_load_data_with_catalog_v1(self, unique_name):
|
|
"""
|
|
Test that when Ranger is the authorization provider in the legacy catalog mode,
|
|
Impala does not throw an AnalysisException when an authorized user tries to execute
|
|
a LOAD DATA query against a table partition of which the respective partition path is
|
|
not writable according to Impala's FsPermissionChecker.
|
|
"""
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_database = unique_name + "_db"
|
|
unique_table = unique_name + "_tbl"
|
|
partition_column = "year"
|
|
partition_value = "2008"
|
|
destination_table_path = "test-warehouse/{0}.db/{1}" \
|
|
.format(unique_database, unique_table, )
|
|
destination_table_partition_path = "{0}/{1}={2}"\
|
|
.format(destination_table_path, partition_column, partition_value)
|
|
file_name = "load_data_with_catalog_v1.txt"
|
|
files_for_table = ["testdata/data/{0}".format(file_name)]
|
|
source_hdfs_dir = "/tmp"
|
|
load_data_statement = "load data inpath '{0}/{1}' into table {2}.{3} " \
|
|
"partition ({4}={5})".format(source_hdfs_dir, file_name, unique_database,
|
|
unique_table, partition_column, partition_value)
|
|
authz_err = "AuthorizationException: User '{0}' does not have privileges to " \
|
|
"execute 'INSERT' on: {1}.{2}".format(user, unique_database, unique_table)
|
|
try:
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
copy_files_to_hdfs_dir(files_for_table, source_hdfs_dir)
|
|
admin_client.execute("create table {0}.{1} (name string) partitioned by ({2} int) "
|
|
"row format delimited fields terminated by ',' "
|
|
"stored as textfile".format(unique_database, unique_table, partition_column))
|
|
# We need to add the partition. Otherwise, the LOAD DATA statement can't create new
|
|
# partitions.
|
|
admin_client.execute("alter table {0}.{1} add partition ({2}={3})"
|
|
.format(unique_database, unique_table, partition_column, partition_value))
|
|
|
|
# Change the permissions of the HDFS path of the destination table partition.
|
|
# Before IMPALA-11871, even we changed the table path to non-writable, loading
|
|
# data into the partition was still allowed if the destination partition path
|
|
# was writable according to Impala's FsPermissionChecker. But if the destination
|
|
# partition path was not writable, an AnalysisException would be thrown.
|
|
self.hdfs_client.chown(destination_table_partition_path, "another_user",
|
|
"another_group")
|
|
# Invalidate the table metadata to force the catalog server to reload the HDFS
|
|
# table and the related partition(s).
|
|
admin_client.execute("invalidate metadata {0}.{1}"
|
|
.format(unique_database, unique_table))
|
|
|
|
# To execute the LOAD DATA statement, a user has to be granted the ALL privilege
|
|
# on the source HDFS path and the INSERT privilege on the destination table.
|
|
# The following verifies the LOAD DATA statement fails with AuthorizationException
|
|
# due to insufficient privileges.
|
|
result = self._run_query_as_user(load_data_statement, user, False)
|
|
assert authz_err in str(result)
|
|
|
|
admin_client.execute("grant all on uri '{0}/{1}' to user {2}"
|
|
.format(source_hdfs_dir, file_name, user))
|
|
# The following verifies the ALL privilege on the source file alone is not
|
|
# sufficient to execute the LOAD DATA statement.
|
|
result = self._run_query_as_user(load_data_statement, user, False)
|
|
assert authz_err in str(result)
|
|
|
|
admin_client.execute("grant insert on table {0}.{1} to user {2}"
|
|
.format(unique_database, unique_table, user))
|
|
# Verify the LOAD DATA statement fails without AnalysisException.
|
|
self._run_query_as_user(load_data_statement, user, True)
|
|
finally:
|
|
admin_client.execute("revoke all on uri '{0}/{1}' from user {2}"
|
|
.format(source_hdfs_dir, file_name, user))
|
|
admin_client.execute("revoke insert on table {0}.{1} from user {2}"
|
|
.format(unique_database, unique_table, user))
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
self.filesystem_client.delete_file_dir("{0}/{1}"
|
|
.format(source_hdfs_dir, file_name))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_legacy_catalog_ownership(self):
|
|
self._test_ownership()
|
|
|
|
@SkipIfFS.hive
|
|
@pytest.mark.execute_serially
|
|
def test_grant_revoke_by_owner_legacy_catalog(self, unique_name):
|
|
self._test_grant_revoke_by_owner(unique_name)
|
|
|
|
@SkipIfFS.hive
|
|
@pytest.mark.execute_serially
|
|
def test_select_view_created_by_non_superuser_with_catalog_v1(self, unique_name):
|
|
self._test_select_view_created_by_non_superuser(unique_name)
|
|
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=IMPALAD_ARGS,
|
|
catalogd_args=CATALOGD_ARGS)
|
|
class TestRangerLocalCatalog(TestRanger):
|
|
"""
|
|
Tests for Apache Ranger integration with Apache Impala in local catalog mode.
|
|
Test methods shares common cluster.
|
|
"""
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_grant_revoke_with_local_catalog(self, unique_name):
|
|
"""Tests grant/revoke with catalog v2 (local catalog)."""
|
|
self._test_grant_revoke(unique_name, [None, "invalidate metadata",
|
|
"refresh authorization"])
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_local_catalog_ownership(self):
|
|
# getTableIfCached() in LocalCatalog loads a minimal incomplete table
|
|
# that does not include the ownership information. Hence show tables
|
|
# *never* show owned tables. TODO(bharathv): Fix in a follow up commit
|
|
pytest.xfail("getTableIfCached() faulty behavior, known issue")
|
|
self._test_ownership()
|
|
|
|
@SkipIfFS.hive
|
|
@pytest.mark.execute_serially
|
|
def test_grant_revoke_by_owner_local_catalog(self, unique_name):
|
|
self._test_grant_revoke_by_owner(unique_name)
|
|
|
|
@SkipIfFS.hive
|
|
@pytest.mark.execute_serially
|
|
def test_select_view_created_by_non_superuser_with_local_catalog(self, unique_name):
|
|
self._test_select_view_created_by_non_superuser(unique_name)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_grant_option(self, unique_name):
|
|
user1 = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_database = unique_name + "_db"
|
|
unique_table = unique_name + "_tbl"
|
|
|
|
try:
|
|
# Set-up temp database/table
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0}.{1} (x int)"
|
|
.format(unique_database, unique_table))
|
|
|
|
# Give user 1 the ability to grant select privileges on unique_database
|
|
self.execute_query_expect_success(admin_client,
|
|
"grant select on database {0} to user {1} with "
|
|
"grant option".format(unique_database, user1))
|
|
self.execute_query_expect_success(admin_client,
|
|
"grant insert on database {0} to user {1} with "
|
|
"grant option".format(unique_database, user1))
|
|
|
|
# Verify user 1 has with_grant privilege on unique_database
|
|
result = self.execute_query("show grant user {0} on database {1}"
|
|
.format(user1, unique_database))
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "true"],
|
|
["USER", user1, unique_database, "", "", "", "", "", "*", "select", "true"],
|
|
["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "true"],
|
|
["USER", user1, unique_database, "*", "*", "", "", "", "", "select", "true"]])
|
|
|
|
# Revoke select privilege and check grant option is still present
|
|
self.execute_query_expect_success(admin_client,
|
|
"revoke select on database {0} from user {1}"
|
|
.format(unique_database, user1))
|
|
result = self.execute_query("show grant user {0} on database {1}"
|
|
.format(user1, unique_database))
|
|
# Revoking the select privilege also deprives the grantee of the permission to
|
|
# transfer other privilege(s) on the same resource to other principals. This is a
|
|
# current limitation of Ranger since privileges on the same resource share the same
|
|
# delegateAdmin field in the corresponding RangerPolicyItem.
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "false"],
|
|
["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "false"]])
|
|
|
|
# Revoke privilege granting from user 1
|
|
self.execute_query_expect_success(admin_client, "revoke grant option for insert "
|
|
"on database {0} from user {1}"
|
|
.format(unique_database, user1))
|
|
|
|
# User 1 can no longer grant privileges on unique_database
|
|
# In ranger it is currently not possible to revoke grant for a single access type
|
|
result = self.execute_query("show grant user {0} on database {1}"
|
|
.format(user1, unique_database))
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user1, unique_database, "", "", "", "", "", "*", "insert", "false"],
|
|
["USER", user1, unique_database, "*", "*", "", "", "", "", "insert", "false"]])
|
|
finally:
|
|
admin_client.execute("revoke insert on database {0} from user {1}"
|
|
.format(unique_database, user1))
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_show_grant(self, unique_name):
|
|
user = getuser()
|
|
group = grp.getgrnam(getuser()).gr_name
|
|
test_data = [(user, "USER"), (group, "GROUP")]
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_db = unique_name + "_db"
|
|
unique_table = unique_name + "_tbl"
|
|
udf = "identity"
|
|
|
|
try:
|
|
# Create test database/table
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_db))
|
|
admin_client.execute("create database {0}".format(unique_db))
|
|
admin_client.execute("create table {0}.{1} (x int)"
|
|
.format(unique_db, unique_table))
|
|
|
|
for data in test_data:
|
|
# Test basic show grant functionality for user/group
|
|
self._test_show_grant_basic(admin_client, data[1], data[0], unique_db,
|
|
unique_table, udf)
|
|
# Test that omitting ON <resource> results in failure
|
|
self._test_show_grant_without_on(data[1], data[0])
|
|
|
|
# Test inherited privileges (server privileges show for database, etc.)
|
|
self._test_show_grant_inherited(admin_client, data[1], data[0], unique_db,
|
|
unique_table)
|
|
|
|
# Test ALL privilege hides other privileges
|
|
self._test_show_grant_mask(admin_client, user)
|
|
|
|
# Test ALL privilege on UDF hides other privileges
|
|
self._test_show_grant_mask_on_udf(admin_client, data[1], data[0], unique_db, udf)
|
|
|
|
# Test USER inherits privileges for their GROUP
|
|
self._test_show_grant_user_group(admin_client, user, group, unique_db)
|
|
finally:
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_db))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_grant_revoke_ranger_api(self, unique_name):
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_db = unique_name + "_db"
|
|
resource = {
|
|
"database": unique_db,
|
|
"column": "*",
|
|
"table": "*"
|
|
}
|
|
access = ["select", "create"]
|
|
|
|
try:
|
|
# Create the test database
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_db))
|
|
admin_client.execute("create database {0}".format(unique_db))
|
|
|
|
# Grant privileges via Ranger REST API
|
|
TestRanger._grant_ranger_privilege(user, resource, access)
|
|
|
|
# Privileges should be stale before a refresh
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
TestRanger._check_privileges(result, [])
|
|
|
|
# Refresh and check updated privileges
|
|
admin_client.execute("refresh authorization")
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user, unique_db, "*", "*", "", "", "", "", "create", "false"],
|
|
["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"]
|
|
])
|
|
|
|
# Revoke privileges via Ranger REST API
|
|
TestRanger._revoke_ranger_privilege(user, resource, access)
|
|
|
|
# Privileges should be stale before a refresh
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
TestRanger._check_privileges(result, [
|
|
["USER", user, unique_db, "*", "*", "", "", "", "", "create", "false"],
|
|
["USER", user, unique_db, "*", "*", "", "", "", "", "select", "false"]
|
|
])
|
|
|
|
# Refresh and check updated privileges
|
|
admin_client.execute("refresh authorization")
|
|
result = self.client.execute("show grant user {0} on database {1}"
|
|
.format(user, unique_db))
|
|
|
|
TestRanger._check_privileges(result, [])
|
|
finally:
|
|
admin_client.execute("revoke all on database {0} from user {1}"
|
|
.format(unique_db, user))
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_db))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_unsupported_sql(self):
|
|
"""Tests unsupported SQL statements when running with Ranger."""
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
error_msg = "UnsupportedOperationException: SHOW GRANT is not supported without a " \
|
|
"defined resource in Ranger."
|
|
statement = "show grant role foo"
|
|
result = self.execute_query_expect_failure(admin_client, statement)
|
|
assert error_msg in str(result)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_grant_revoke_invalid_principal(self):
|
|
"""Tests grant/revoke to/from invalid principal should return more readable
|
|
error messages."""
|
|
valid_user = "admin"
|
|
invalid_user = "invalid_user"
|
|
invalid_group = "invalid_group"
|
|
# TODO(IMPALA-8640): Create two different Impala clients because the users to
|
|
# workaround the bug.
|
|
invalid_impala_client = self.create_impala_client(user=invalid_user)
|
|
valid_impala_client = self.create_impala_client(user=valid_user)
|
|
for statement in ["grant select on table functional.alltypes to user {0}"
|
|
.format(getuser()),
|
|
"revoke select on table functional.alltypes from user {0}"
|
|
.format(getuser())]:
|
|
result = self.execute_query_expect_failure(invalid_impala_client, statement)
|
|
if "grant" in statement:
|
|
assert "Error granting a privilege in Ranger. Ranger error message: " \
|
|
"HTTP 403 Error: Grantor user invalid_user doesn't exist" in str(result)
|
|
else:
|
|
assert "Error revoking a privilege in Ranger. Ranger error message: " \
|
|
"HTTP 403 Error: Grantor user invalid_user doesn't exist" in str(result)
|
|
|
|
for statement in ["grant select on table functional.alltypes to user {0}"
|
|
.format(invalid_user),
|
|
"revoke select on table functional.alltypes from user {0}"
|
|
.format(invalid_user)]:
|
|
result = self.execute_query_expect_failure(valid_impala_client, statement)
|
|
if "grant" in statement:
|
|
assert "Error granting a privilege in Ranger. Ranger error message: " \
|
|
"HTTP 403 Error: Grantee user invalid_user doesn't exist" in str(result)
|
|
else:
|
|
assert "Error revoking a privilege in Ranger. Ranger error message: " \
|
|
"HTTP 403 Error: Grantee user invalid_user doesn't exist" in str(result)
|
|
|
|
for statement in ["grant select on table functional.alltypes to group {0}"
|
|
.format(invalid_group),
|
|
"revoke select on table functional.alltypes from group {0}"
|
|
.format(invalid_group)]:
|
|
result = self.execute_query_expect_failure(valid_impala_client, statement)
|
|
if "grant" in statement:
|
|
assert "Error granting a privilege in Ranger. Ranger error message: " \
|
|
"HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result)
|
|
else:
|
|
assert "Error revoking a privilege in Ranger. Ranger error message: " \
|
|
"HTTP 403 Error: Grantee group invalid_group doesn't exist" in str(result)
|
|
invalid_impala_client.close()
|
|
valid_impala_client.close()
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_show_functions(self, unique_name):
|
|
user1 = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_database = unique_name + "_db"
|
|
privileges = ["ALTER", "DROP", "CREATE", "INSERT", "SELECT", "REFRESH"]
|
|
fs_prefix = os.getenv("FILESYSTEM_PREFIX") or str()
|
|
try:
|
|
# Set-up temp database + function
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
self.execute_query_expect_success(admin_client, "create function {0}.foo() RETURNS"
|
|
" int LOCATION '{1}/test-warehouse/libTestUdfs.so'"
|
|
"SYMBOL='Fn'".format(unique_database, fs_prefix))
|
|
# Check "show functions" with no privilege granted.
|
|
result = self._run_query_as_user("show functions in {0}".format(unique_database),
|
|
user1, False)
|
|
err = "User '{0}' does not have privileges to access: {1}.*.*". \
|
|
format(user1, unique_database)
|
|
assert err in str(result)
|
|
for privilege in privileges:
|
|
try:
|
|
# Grant privilege
|
|
self.execute_query_expect_success(admin_client,
|
|
"grant {0} on database {1} to user {2}"
|
|
.format(privilege, unique_database, user1))
|
|
# Check with current privilege
|
|
result = self._run_query_as_user("show functions in {0}"
|
|
.format(unique_database), user1, True)
|
|
assert "foo()" in result.get_data()
|
|
finally:
|
|
# Revoke privilege
|
|
admin_client.execute("revoke {0} on database {1} from user {2}"
|
|
.format(privilege, unique_database, user1))
|
|
finally:
|
|
# Drop database
|
|
self._run_query_as_user("drop database {0} cascade".format(unique_database),
|
|
ADMIN, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_select_function(self, unique_name):
|
|
"""Verifies that to execute a UDF in a database, a user has to be granted a) the
|
|
SELECT privilege on the UDF, and b) any of the SELECT, INSERT, REFRESH privileges on
|
|
all the tables, columns in the database."""
|
|
test_user = "non_owner"
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
unique_database = unique_name + "_db"
|
|
fs_prefix = os.getenv("FILESYSTEM_PREFIX") or str()
|
|
try:
|
|
# Create a temporary database and a user-defined function.
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create function {0}.identity(bigint) "
|
|
"RETURNS bigint "
|
|
"LOCATION "
|
|
"'{1}/test-warehouse/impala-hive-udfs.jar' "
|
|
"SYMBOL='org.apache.impala.TestUdf'"
|
|
.format(unique_database, fs_prefix))
|
|
# Create a temporary table and grant 'test_user' the INSERT privilege on the table,
|
|
# which is necessary for 'test_user' to insert values into a table.
|
|
admin_client.execute("create table {0}.tbl (id bigint)".format(unique_database))
|
|
admin_client.execute("grant insert on table {0}.tbl to user {1}"
|
|
.format(unique_database, test_user))
|
|
|
|
stmts = ["select {0}.identity(1)".format(unique_database),
|
|
"insert into {0}.tbl values ({0}.identity(1))".format(unique_database)]
|
|
for stmt in stmts:
|
|
# A user not granted any privilege is not allowed to execute the UDF.
|
|
result = self._run_query_as_user(stmt, test_user, False)
|
|
err = "User '{0}' does not have privileges to SELECT functions in: " \
|
|
"{1}.identity".format(test_user, unique_database)
|
|
assert err in str(result)
|
|
|
|
view_metadata_privileges = ["select", "insert", "refresh"]
|
|
for privilege_on_database in view_metadata_privileges:
|
|
try:
|
|
# A user is allowed to execute a UDF in a database if the user has been
|
|
# granted the SELECT privilege on the database. Such a privilege covers all
|
|
# the tables, columns, as well as UDFs in the database.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant {0} on database {1} to user {2}"
|
|
.format(privilege_on_database, unique_database, test_user),
|
|
"show grant user {0} on database {1}"
|
|
.format(test_user, unique_database), [
|
|
["USER", test_user, unique_database, "", "", "", "", "", "*",
|
|
privilege_on_database, "false"],
|
|
["USER", test_user, unique_database, "*", "*", "", "", "", "",
|
|
privilege_on_database, "false"]])
|
|
# Query succeeds only if 'privilege_on_database' is "select".
|
|
if privilege_on_database != "select":
|
|
result = self._run_query_as_user(stmt, test_user, False)
|
|
err = "User '{0}' does not have privileges to SELECT functions in: " \
|
|
"{1}.identity".format(test_user, unique_database)
|
|
assert err in str(result)
|
|
else:
|
|
self._run_query_as_user(stmt, test_user, True)
|
|
|
|
# A user not being granted the SELECT privilege on any UDF in the database is
|
|
# not allowed to execute the UDF even though the user has
|
|
# the 'privilege_on_database' privilege on all the tables, columns in the
|
|
# database.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke {0} on user_defined_fn {1}.`*` from user {2}"
|
|
.format(privilege_on_database, unique_database, test_user),
|
|
"show grant user {0} on database {1}"
|
|
.format(test_user, unique_database), [
|
|
["USER", test_user, unique_database, "*", "*", "", "", "", "",
|
|
privilege_on_database, "false"]])
|
|
result = self._run_query_as_user(stmt, test_user, False)
|
|
err = "User '{0}' does not have privileges to SELECT functions in: " \
|
|
"{1}.identity".format(test_user, unique_database)
|
|
assert err in str(result)
|
|
|
|
# A user is allowed to execute the UDF if the user is explicitly granted the
|
|
# SELECT privilege on the UDF.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "grant select on user_defined_fn {0}.identity to user {1}"
|
|
.format(unique_database, test_user),
|
|
"show grant user {0} on user_defined_fn {1}.identity"
|
|
.format(test_user, unique_database), [
|
|
["USER", test_user, unique_database, "", "", "", "", "", "identity",
|
|
"select", "false"]])
|
|
self._run_query_as_user(stmt, test_user, True)
|
|
|
|
# Even though a user is explicitly granted the SELECT privilege on the UDF,
|
|
# the user is not allowed to execute the UDF if the user is not granted any
|
|
# of the SELECT, INSERT, or REFRESH privileges on all the tables and columns
|
|
# in the database.
|
|
self._update_privileges_and_verify(
|
|
admin_client, "revoke {0} on database {1} from user {2}"
|
|
.format(privilege_on_database, unique_database, test_user),
|
|
"show grant user {0} on database {1}".format(test_user, unique_database),
|
|
[])
|
|
result = admin_client.execute("show grant user {0} "
|
|
"on user_defined_fn {1}.identity"
|
|
.format(test_user, unique_database))
|
|
TestRanger._check_privileges(result, [
|
|
["USER", test_user, unique_database, "", "", "", "", "", "identity",
|
|
"select", "false"]])
|
|
result = self._run_query_as_user(stmt, test_user, False)
|
|
err = "User '{0}' does not have privileges to access: {1}"\
|
|
.format(test_user, unique_database)
|
|
assert err in str(result)
|
|
finally:
|
|
# Revoke the granted privileges.
|
|
admin_client.execute("revoke {0} on database {1} from user {2}"
|
|
.format(privilege_on_database, unique_database,
|
|
test_user))
|
|
admin_client.execute("revoke select on user_defined_fn {0}.identity "
|
|
"from user {1}"
|
|
.format(unique_database, test_user))
|
|
finally:
|
|
# Revoke the granted privilege on the temporary table.
|
|
self._run_query_as_user("revoke insert on table {0}.tbl from user {1}"
|
|
.format(unique_database, test_user),
|
|
ADMIN, True)
|
|
# Drop the database.
|
|
self._run_query_as_user("drop database {0} cascade".format(unique_database),
|
|
ADMIN, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_select_function_with_fallback_db(self, unique_name):
|
|
"""Verifies that Impala should not allow using functions in the fallback database
|
|
unless the user has been granted sufficient privileges on the given database."""
|
|
test_user = "non_owner"
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
non_owner_client = self.create_impala_client(user=test_user)
|
|
refresh_stmt = "refresh authorization"
|
|
unique_database = unique_name + "_db"
|
|
|
|
try:
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
admin_client.execute("create database %s" % unique_database)
|
|
admin_client.execute("create function {0}.identity(bigint) "
|
|
"RETURNS bigint "
|
|
"LOCATION "
|
|
"'{1}/libTestUdfs.so' "
|
|
"SYMBOL='Identity'"
|
|
.format(unique_database, WAREHOUSE))
|
|
# A user not granted any privilege is not allowed to execute the UDF.
|
|
result = self._run_query_as_user("select identity(1)", test_user, False)
|
|
err = "User '{0}' does not have privileges to SELECT functions in: " \
|
|
"default.identity".format(test_user)
|
|
assert err in str(result)
|
|
|
|
admin_client.execute(
|
|
"grant select on database default to user {0}".format(test_user))
|
|
self._refresh_authorization(admin_client, refresh_stmt)
|
|
|
|
result = self._run_query_as_user("select identity(1)", test_user, False)
|
|
err = "default.identity() unknown for database default."
|
|
assert err in str(result)
|
|
|
|
# A user is not allowed to access fallback database if the user has no
|
|
# privileges on it, whether the function exists or not.
|
|
result = self.execute_query_expect_failure(
|
|
non_owner_client, "select identity(1)", query_options={
|
|
'FALLBACK_DB_FOR_FUNCTIONS': unique_database})
|
|
err = "User '{0}' does not have privileges to SELECT functions in: " \
|
|
"{1}.identity".format(test_user, unique_database)
|
|
assert err in str(result)
|
|
|
|
result = self.execute_query_expect_failure(
|
|
non_owner_client, "select fn()", query_options={
|
|
'FALLBACK_DB_FOR_FUNCTIONS': unique_database})
|
|
err = "User '{0}' does not have privileges to SELECT functions in: " \
|
|
"{1}.fn".format(test_user, unique_database)
|
|
assert err in str(result)
|
|
|
|
# A user has to be granted a) any of the INSERT, REFRESH, SELECT privileges on all
|
|
# the tables and columns in the fallback database, and b) the SELECT privilege on
|
|
# the UDF in the fallback database in order to execute the UDF.
|
|
admin_client.execute(
|
|
"grant insert on database {0} to user {1}".format(
|
|
unique_database, test_user))
|
|
admin_client.execute(
|
|
"grant select on user_defined_fn {0}.identity to user {1}".format(
|
|
unique_database, test_user))
|
|
self._refresh_authorization(admin_client, refresh_stmt)
|
|
|
|
# A user is allowed to use functions in the fallback database if the user is
|
|
# explicitly granted the SELECT privilege.
|
|
self.execute_query_expect_success(
|
|
non_owner_client,
|
|
"select identity(1)",
|
|
query_options={'FALLBACK_DB_FOR_FUNCTIONS': unique_database})
|
|
finally:
|
|
# Revoke the granted privileges.
|
|
admin_client.execute("revoke select on database default from user {0}"
|
|
.format(test_user))
|
|
admin_client.execute("revoke insert on database {0} from user {1}"
|
|
.format(unique_database, test_user))
|
|
admin_client.execute("revoke select on user_defined_fn {0}.identity from user {1}"
|
|
.format(unique_database, test_user),)
|
|
# Drop the database.
|
|
self._run_query_as_user("drop database {0} cascade".format(unique_database),
|
|
ADMIN, True)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_column_masking(self, vector, unique_name):
|
|
user = getuser()
|
|
unique_database = unique_name + '_db'
|
|
# Create another client for admin user since current user doesn't have privileges to
|
|
# create/drop databases or refresh authorization.
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
admin_client.execute("drop database if exists %s cascade" % unique_database)
|
|
admin_client.execute("create database %s" % unique_database)
|
|
# Grant CREATE on database to current user for tests on CTAS, CreateView etc.
|
|
admin_client.execute("grant create on database %s to user %s"
|
|
% (unique_database, user))
|
|
policy_cnt = 0
|
|
try:
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id",
|
|
"CUSTOM", "id * 100") # use column name 'id' directly
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny", "bool_col",
|
|
"MASK_NULL")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny", "string_col",
|
|
"CUSTOM", "concat({col}, 'aaa')") # use column reference '{col}'
|
|
policy_cnt += 1
|
|
# Add policy to a view
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes_view", "string_col",
|
|
"CUSTOM", "concat('vvv', {col})")
|
|
policy_cnt += 1
|
|
# Add policy to the table used in the view
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes", "id",
|
|
"CUSTOM", "{col} * 100")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes", "string_col",
|
|
"CUSTOM", "concat({col}, 'ttt')")
|
|
policy_cnt += 1
|
|
# Add policy to mask "bigint_col" using a subquery. It will hit IMPALA-10483.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypesagg", "bigint_col",
|
|
"CUSTOM", "(select count(*) from functional.alltypestiny)")
|
|
policy_cnt += 1
|
|
# Add column masking policy for virtual column INPUT__FILE__NAME
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny",
|
|
"input__file__name",
|
|
"CUSTOM", "mask_show_last_n({col}, 10, 'x', 'x', 'x', -1, '1')")
|
|
policy_cnt += 1
|
|
# Add column masking policy to an Iceberg table.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet", "iceberg_partitioned",
|
|
"id", "MASK_NULL")
|
|
policy_cnt += 1
|
|
# Add column masking policy to an Iceberg V2 table.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet",
|
|
"iceberg_v2_delete_positional", "data", "MASK_NULL")
|
|
policy_cnt += 1
|
|
# Add invalid column masking policy to trigger an error during re-analyze.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet",
|
|
"alltypessmall", "string_col", "CUSTOM", "concat(string_col, invalid_col)")
|
|
policy_cnt += 1
|
|
self.execute_query_expect_success(admin_client, "refresh authorization")
|
|
self.run_test_case("QueryTest/ranger_column_masking", vector,
|
|
test_file_vars={'$UNIQUE_DB': unique_database})
|
|
# Add a policy on a primitive column of a table which contains nested columns.
|
|
for db in ['functional_parquet', 'functional_orc_def']:
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, db, "complextypestbl",
|
|
"id", "CUSTOM", "100 * {col}")
|
|
policy_cnt += 1
|
|
# Add policies on a nested column though they won't be recognized (same as Hive).
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, db, "complextypestbl",
|
|
"nested_struct.a", "CUSTOM", "100 * {col}")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, db, "complextypestbl",
|
|
"int_array", "MASK_NULL")
|
|
policy_cnt += 1
|
|
self.execute_query_expect_success(admin_client, "refresh authorization")
|
|
self.run_test_case("QueryTest/ranger_column_masking_complex_types", vector,
|
|
use_db=db)
|
|
finally:
|
|
admin_client.execute("revoke create on database %s from user %s"
|
|
% (unique_database, user))
|
|
admin_client.execute("drop database %s cascade" % unique_database)
|
|
for i in range(policy_cnt):
|
|
TestRanger._remove_policy(unique_name + str(i))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_block_metadata_update(self, unique_name):
|
|
"""Test that the metadata update operation on a table by a requesting user is denied
|
|
if there exists a column masking policy defined on any column in the table for the
|
|
requesting user even when the table metadata (e.g., list of columns) have been
|
|
invalidated immediately before the requesting user tries to invalidate the table
|
|
metadata again. This test would have failed if we did not load the table metadata
|
|
for ResetMetadataStmt."""
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
non_owner_client = self.create_impala_client(user=user)
|
|
try:
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name, user, "functional", "alltypestiny", "id",
|
|
"CUSTOM", "id * 100")
|
|
self.execute_query_expect_success(admin_client,
|
|
"invalidate metadata functional.alltypestiny")
|
|
admin_client.execute("grant all on server to user {0}".format(user))
|
|
result = self.execute_query_expect_failure(
|
|
non_owner_client, "invalidate metadata functional.alltypestiny")
|
|
assert "User '{0}' does not have privileges to execute " \
|
|
"'INVALIDATE METADATA/REFRESH' on: functional.alltypestiny".format(user) \
|
|
in str(result)
|
|
finally:
|
|
TestRanger._remove_policy(unique_name)
|
|
admin_client.execute("revoke all on server from user {0}".format(user))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_masking_overload_coverage(self, vector, unique_name):
|
|
"""Test that we have cover all the overloads of the masking functions that could
|
|
appear in using default policies."""
|
|
user = getuser()
|
|
policy_names = []
|
|
# Create another client for admin user since current user doesn't have privileges to
|
|
# create/drop databases or refresh authorization.
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
try:
|
|
for mask_type in ["MASK", "MASK_SHOW_LAST_4", "MASK_SHOW_FIRST_4", "MASK_HASH",
|
|
"MASK_NULL", "MASK_NONE", "MASK_DATE_SHOW_YEAR"]:
|
|
LOG.info("Testing default mask type " + mask_type)
|
|
# Add masking policies on functional.alltypestiny
|
|
for col in ["bool_col", "tinyint_col", "smallint_col", "int_col", "bigint_col",
|
|
"float_col", "double_col", "date_string_col", "string_col",
|
|
"timestamp_col", "year"]:
|
|
policy_name = "_".join((unique_name, col, mask_type))
|
|
TestRanger._add_column_masking_policy(
|
|
policy_name, user, "functional", "alltypestiny", col, mask_type)
|
|
policy_names.append(policy_name)
|
|
# Add masking policies on functional.date_tbl
|
|
for col in ["date_col", "date_part"]:
|
|
policy_name = "_".join((unique_name, col, mask_type))
|
|
TestRanger._add_column_masking_policy(
|
|
policy_name, user, "functional", "date_tbl", col, mask_type)
|
|
policy_names.append(policy_name)
|
|
# Add masking policies on functional.chars_tiny
|
|
for col in ["cs", "cl", "vc"]:
|
|
policy_name = "_".join((unique_name, col, mask_type))
|
|
TestRanger._add_column_masking_policy(
|
|
policy_name, user, "functional", "chars_tiny", col, mask_type)
|
|
policy_names.append(policy_name)
|
|
|
|
self.execute_query_expect_success(admin_client, "refresh authorization")
|
|
self.run_test_case("QueryTest/ranger_alltypes_" + mask_type.lower(), vector)
|
|
while policy_names:
|
|
TestRanger._remove_policy(policy_names.pop())
|
|
finally:
|
|
while policy_names:
|
|
TestRanger._remove_policy(policy_names.pop())
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_row_filtering(self, vector, unique_name):
|
|
user = getuser()
|
|
unique_database = unique_name + '_db'
|
|
# Create another client for admin user since current user doesn't have privileges to
|
|
# create/drop databases or refresh authorization.
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
admin_client.execute("drop database if exists %s cascade" % unique_database)
|
|
admin_client.execute("create database %s" % unique_database)
|
|
# Grant CREATE on database to current user for tests on CTAS, CreateView etc.
|
|
# Note that 'user' is the owner of the test tables. No additional GRANTs are required.
|
|
admin_client.execute("grant create on database %s to user %s"
|
|
% (unique_database, user))
|
|
policy_cnt = 0
|
|
try:
|
|
#######################################################
|
|
# Test row filters on current user
|
|
#######################################################
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id % 2 = 0")
|
|
policy_cnt += 1
|
|
# Add a filter using builtin functions
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypessmall",
|
|
"""(string_col = concat('0', '') and id <= 0) or
|
|
(string_col = '1' and bool_col = true and id > 90)""")
|
|
policy_cnt += 1
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes",
|
|
"year = 2009 and month = 1")
|
|
policy_cnt += 1
|
|
# Add a row-filtering policy using a nonexisting column 'test_id'. Queries in this
|
|
# table will fail in resolving the column.
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet", "alltypes",
|
|
"test_id = id")
|
|
policy_cnt += 1
|
|
# Add an illegal row filter that could cause parsing error.
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet", "alltypessmall",
|
|
"100 id = int_col")
|
|
policy_cnt += 1
|
|
# Add a row-filtering policy on a view. 'alltypes_view' is a view on table
|
|
# 'alltypes' which also has a row-filtering policy. They will both be performed.
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes_view",
|
|
"id < 5")
|
|
policy_cnt += 1
|
|
# Row-filtering expr using subquery on current table.
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypesagg",
|
|
"id = (select min(id) from functional.alltypesagg)")
|
|
policy_cnt += 1
|
|
# Row-filtering expr using subquery on other tables.
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet", "alltypesagg",
|
|
"id in (select id from functional.alltypestiny)")
|
|
policy_cnt += 1
|
|
# Row-filtering expr on nested types
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet", "complextypestbl",
|
|
"nested_struct.a is not NULL")
|
|
policy_cnt += 1
|
|
# Row-filtering expr on Iceberg table
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional_parquet",
|
|
"iceberg_v2_positional_not_all_data_files_have_delete_files",
|
|
"i % 2 = 1")
|
|
policy_cnt += 1
|
|
admin_client.execute("refresh authorization")
|
|
self.run_test_case("QueryTest/ranger_row_filtering", vector,
|
|
test_file_vars={'$UNIQUE_DB': unique_database})
|
|
|
|
#######################################################
|
|
# Test row filter policy on multiple users
|
|
#######################################################
|
|
TestRanger._add_multiuser_row_filtering_policy(
|
|
unique_name + str(policy_cnt), "functional_parquet", "alltypestiny",
|
|
[user, "non_owner", "non_owner_2"],
|
|
["id=0", "id=1", "id=2"])
|
|
policy_cnt += 1
|
|
admin_client.execute(
|
|
"grant select on table functional_parquet.alltypestiny to user non_owner")
|
|
admin_client.execute(
|
|
"grant select on table functional_parquet.alltypestiny to user non_owner_2")
|
|
admin_client.execute("refresh authorization")
|
|
non_owner_client = self.create_impala_client(user="non_owner")
|
|
non_owner_2_client = self.create_impala_client(user="non_owner_2")
|
|
query = "select id from functional_parquet.alltypestiny"
|
|
assert self.client.execute(query).get_data() == "0"
|
|
assert non_owner_client.execute(query).get_data() == "1"
|
|
assert non_owner_2_client.execute(query).get_data() == "2"
|
|
query = "select max(id) from functional_parquet.alltypestiny"
|
|
assert self.client.execute(query).get_data() == "0"
|
|
assert non_owner_client.execute(query).get_data() == "1"
|
|
assert non_owner_2_client.execute(query).get_data() == "2"
|
|
|
|
#######################################################
|
|
# Test row filters that contains complex subqueries
|
|
#######################################################
|
|
admin_client.execute("""create table %s.employee (
|
|
e_id bigint,
|
|
e_name string,
|
|
e_nation string)
|
|
stored as textfile""" % unique_database)
|
|
admin_client.execute("""insert into %s.employee values
|
|
(0, '%s', 'CHINA'),
|
|
(1, 'non_owner', 'PERU'),
|
|
(2, 'non_owner2', 'IRAQ')
|
|
""" % (unique_database, user))
|
|
admin_client.execute("grant select on table %s.employee to user %s"
|
|
% (unique_database, user))
|
|
admin_client.execute("grant select on table %s.employee to user non_owner"
|
|
% unique_database)
|
|
admin_client.execute("grant select on table %s.employee to user non_owner_2"
|
|
% unique_database)
|
|
admin_client.execute("grant select on database tpch to user %s" % user)
|
|
admin_client.execute("grant select on database tpch to user non_owner")
|
|
admin_client.execute("grant select on database tpch to user non_owner_2")
|
|
|
|
# Each employee can only see customers in the same nation.
|
|
row_filter_tmpl = """c_nationkey in (
|
|
select n_nationkey from tpch.nation
|
|
where n_name in (
|
|
select e_nation from {db}.employee
|
|
where e_name = %s
|
|
)
|
|
)""".format(db=unique_database)
|
|
TestRanger._add_multiuser_row_filtering_policy(
|
|
unique_name + str(policy_cnt), "tpch", "customer",
|
|
[user, 'non_owner', 'non_owner_2'], [row_filter_tmpl % "current_user()"])
|
|
policy_cnt += 1
|
|
admin_client.execute("refresh authorization")
|
|
|
|
user_query = "select count(*) from tpch.customer"
|
|
admin_query_tmpl = user_query + " where " + row_filter_tmpl
|
|
self._verified_multiuser_results(
|
|
admin_client, admin_query_tmpl, user_query,
|
|
[user, 'non_owner', 'non_owner_2'],
|
|
[self.client, non_owner_client, non_owner_2_client])
|
|
|
|
tpch_q10_tmpl = """select c_custkey, c_name,
|
|
sum(l_extendedprice * (1 - l_discount)) as revenue,
|
|
c_acctbal, n_name, c_address, c_phone, c_comment
|
|
from {customer_place_holder}, tpch.orders, tpch.lineitem, tpch.nation
|
|
where
|
|
c_custkey = o_custkey and l_orderkey = o_orderkey
|
|
and o_orderdate >= '1993-10-01' and o_orderdate < '1994-01-01'
|
|
and l_returnflag = 'R' and c_nationkey = n_nationkey
|
|
group by c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment
|
|
order by revenue desc, c_custkey
|
|
limit 20"""
|
|
user_query = tpch_q10_tmpl.format(customer_place_holder="tpch.customer")
|
|
admin_value = "(select * from tpch.customer where " + row_filter_tmpl + ") v"
|
|
admin_query_tmpl = tpch_q10_tmpl.format(customer_place_holder=admin_value)
|
|
self._verified_multiuser_results(
|
|
admin_client, admin_query_tmpl, user_query,
|
|
[user, 'non_owner', 'non_owner_2'],
|
|
[self.client, non_owner_client, non_owner_2_client])
|
|
finally:
|
|
for i in range(policy_cnt):
|
|
TestRanger._remove_policy(unique_name + str(i))
|
|
cleanup_statements = [
|
|
"revoke select on table functional_parquet.alltypestiny from user non_owner",
|
|
"revoke select on table functional_parquet.alltypestiny from user non_owner_2",
|
|
"revoke select on database tpch from user non_owner",
|
|
"revoke select on database tpch from user non_owner_2",
|
|
"revoke select on table %s.employee from user %s" % (unique_database, user),
|
|
"revoke select on table %s.employee from user non_owner" % unique_database,
|
|
"revoke select on table %s.employee from user non_owner_2" % unique_database,
|
|
]
|
|
for statement in cleanup_statements:
|
|
try:
|
|
admin_client.execute(statement, user=ADMIN)
|
|
except Exception as e:
|
|
LOG.error("Ignored exception in cleanup: " + str(e))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_column_masking_and_row_filtering(self, vector, unique_name):
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
policy_cnt = 0
|
|
try:
|
|
# 3 column masking policies and 1 row filtering policy on functional.alltypestiny.
|
|
# The row filtering policy will take effect first, then the column masking policies
|
|
# mask the final results.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id",
|
|
"CUSTOM", "id + 100") # use column name 'id' directly
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny", "string_col",
|
|
"MASK_NULL")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny",
|
|
"date_string_col", "MASK")
|
|
policy_cnt += 1
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypestiny", "id % 3 = 0")
|
|
policy_cnt += 1
|
|
# 2 column masking policies on functional.alltypes and 1 row filtering policy on
|
|
# functional.alltypesview which is a view on functional.alltypes. The column masking
|
|
# policies on functional.alltypes will take effect first, which affects the results
|
|
# of functional.alltypesview. Then the row filtering policy of the view filters
|
|
# out rows of functional.alltypesview.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes", "id",
|
|
"CUSTOM", "-id") # use column name 'id' directly
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes",
|
|
"date_string_col", "MASK")
|
|
policy_cnt += 1
|
|
TestRanger._add_row_filtering_policy(
|
|
unique_name + str(policy_cnt), user, "functional", "alltypes_view",
|
|
"id >= -8 and date_string_col = 'nn/nn/nn'")
|
|
policy_cnt += 1
|
|
|
|
self.execute_query_expect_success(admin_client, "refresh authorization")
|
|
self.run_test_case("QueryTest/ranger_column_masking_and_row_filtering", vector)
|
|
finally:
|
|
for i in range(policy_cnt):
|
|
TestRanger._remove_policy(unique_name + str(i))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_iceberg_time_travel_with_masking(self, unique_name):
|
|
"""When we do a time travel query on an iceberg table we will use the schema from
|
|
the time of the snapshot. Make sure this works when column masking is being used."""
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
short_table_name = "ice_1"
|
|
unique_database = unique_name + "_db"
|
|
tbl_name = unique_database + "." + short_table_name
|
|
|
|
try:
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0} (a int, b string, c int) stored as iceberg"
|
|
.format(tbl_name))
|
|
admin_client.execute("insert into {0} values (1, 'one', 1)".format(tbl_name))
|
|
admin_client.execute("alter table {0} drop column a".format(tbl_name))
|
|
admin_client.execute("insert into {0} values ('two', 2)".format(tbl_name))
|
|
admin_client.execute("grant select on database {0} to user {1} with "
|
|
"grant option".format(unique_database, user))
|
|
admin_client.execute("grant insert on database {0} to user {1} with "
|
|
"grant option".format(unique_database, user))
|
|
|
|
snapshots = get_snapshots(admin_client, tbl_name, expected_result_size=2)
|
|
# Create two versions of a simple query based on the two snapshot ids.
|
|
query = "select * from {0} FOR SYSTEM_VERSION AS OF {1}"
|
|
# The first query is for the data after the first insert.
|
|
first_time_travel_query = query.format(tbl_name, snapshots[0].get_snapshot_id())
|
|
# The expected results of the first query.
|
|
first_query_columns = ['A', 'B', 'C']
|
|
first_results_unmasked = ['1\tone\t1']
|
|
first_results_masked_b = ['1\tNULL\t1'] # Column 'B' is masked to NULL.
|
|
first_results_masked_c = ['1\tone\tNULL'] # Column 'C' is masked to NULL.
|
|
|
|
# Second query is for the data after the second insert, when the column has gone.
|
|
second_time_travel_query = query.format(tbl_name, snapshots[1].get_snapshot_id())
|
|
# The expected results of the second query, depending on masking.
|
|
second_query_columns = ['B', 'C']
|
|
second_results_unmasked = ['one\t1', 'two\t2']
|
|
second_results_masked_b = ['NULL\t1', 'NULL\t2'] # Column 'B' masked to NULL.
|
|
second_results_masked_c = ['one\tNULL', 'two\tNULL'] # Column 'C' masked to NULL.
|
|
|
|
# Run queries without column masking.
|
|
results = self.client.execute(first_time_travel_query)
|
|
assert results.column_labels == first_query_columns
|
|
assert results.data == first_results_unmasked
|
|
|
|
results = self.client.execute(second_time_travel_query)
|
|
assert results.column_labels == second_query_columns
|
|
assert len(results.data) == len(second_results_unmasked)
|
|
for row in second_results_unmasked:
|
|
assert row in results.data
|
|
|
|
try:
|
|
# Mask column C to null.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name, user, unique_database, short_table_name, "C", "MASK_NULL")
|
|
admin_client.execute("refresh authorization")
|
|
|
|
# Run the time travel queries again, time travel should work, but column
|
|
# 'C' is masked.
|
|
results = self.client.execute(first_time_travel_query)
|
|
assert results.column_labels == first_query_columns
|
|
assert results.data == first_results_masked_c
|
|
|
|
results = self.client.execute(second_time_travel_query)
|
|
assert results.column_labels == second_query_columns
|
|
assert len(results.data) == len(second_results_masked_c)
|
|
for row in second_results_masked_c:
|
|
assert row in results.data
|
|
finally:
|
|
# Remove the masking policy.
|
|
TestRanger._remove_policy(unique_name)
|
|
admin_client.execute("refresh authorization")
|
|
|
|
# Run the queries again without masking as we are here.
|
|
results = self.client.execute(first_time_travel_query)
|
|
assert results.column_labels == first_query_columns
|
|
assert results.data == first_results_unmasked
|
|
|
|
results = self.client.execute(second_time_travel_query)
|
|
assert results.column_labels == second_query_columns
|
|
assert len(results.data) == len(second_results_unmasked)
|
|
for row in second_results_unmasked:
|
|
assert row in results.data
|
|
|
|
try:
|
|
# Mask column B to null.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name, user, unique_database, short_table_name, "B", "MASK_NULL")
|
|
admin_client.execute("refresh authorization")
|
|
|
|
# Run the time travel queries again, time travel should work, but column
|
|
# 'B' is masked.
|
|
results = self.client.execute(first_time_travel_query)
|
|
assert results.column_labels == first_query_columns
|
|
assert results.data == first_results_masked_b
|
|
|
|
results = self.client.execute(second_time_travel_query)
|
|
assert results.column_labels == second_query_columns
|
|
for row in second_results_masked_b:
|
|
assert row in results.data
|
|
finally:
|
|
TestRanger._remove_policy(unique_name)
|
|
finally:
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_convert_table_to_iceberg(self, unique_name):
|
|
"""Test that autorization is taken into account when performing a table migration to
|
|
Iceberg."""
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
non_admin_client = self.create_impala_client(user=user)
|
|
unique_database = unique_name + "_db"
|
|
tbl_name = unique_database + "." + "hive_tbl_to_convert"
|
|
|
|
try:
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
|
|
# create table using admin user.
|
|
admin_client.execute("create table {0} (a int, b string) stored as parquet".format(
|
|
tbl_name))
|
|
admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name))
|
|
|
|
try:
|
|
# non-admin user can't convert table by default.
|
|
result = self.execute_query_expect_failure(
|
|
non_admin_client, "alter table {0} convert to iceberg".format(tbl_name))
|
|
assert "User '{0}' does not have privileges to access: {1}".format(
|
|
user, unique_database) in str(result)
|
|
|
|
# Grant ALL privileges on the table for non-admin user. Even with this the query
|
|
# should fail as we expect DB level ALL privileges for table migration. Once
|
|
# https://issues.apache.org/jira/browse/IMPALA-12190 is fixed, this should also
|
|
# pass with table-level ALL privileges.
|
|
admin_client.execute("grant all on table {0} to user {1}".format(tbl_name, user))
|
|
result = self.execute_query_expect_failure(
|
|
non_admin_client, "alter table {0} convert to iceberg".format(tbl_name))
|
|
assert "User '{0}' does not have privileges to access: {1}".format(
|
|
user, unique_database) in str(result)
|
|
|
|
# After granting ALL privileges on the DB, the table migration should succeed.
|
|
admin_client.execute("grant all on database {0} to user {1}"
|
|
.format(unique_database, user))
|
|
self.execute_query_expect_success(
|
|
non_admin_client, "alter table {0} convert to iceberg".format(tbl_name))
|
|
|
|
result = non_admin_client.execute("describe formatted {0}".format(tbl_name))
|
|
all_data = result.get_data()
|
|
assert "org.apache.iceberg.mr.hive.HiveIcebergSerDe" in all_data
|
|
assert "org.apache.iceberg.mr.hive.HiveIcebergInputFormat" in all_data
|
|
assert "org.apache.iceberg.mr.hive.HiveIcebergOutputFormat" in all_data
|
|
finally:
|
|
# Revoke privileges
|
|
admin_client.execute("revoke all on table {0} from user {1}"
|
|
.format(tbl_name, user))
|
|
admin_client.execute("revoke all on database {0} from user {1}"
|
|
.format(unique_database, user))
|
|
|
|
tbl_name2 = unique_database + "." + "hive_tbl_to_convert2"
|
|
# create table using admin user.
|
|
admin_client.execute("create table {0} (a int, b string) stored as parquet".format(
|
|
tbl_name2))
|
|
admin_client.execute("insert into {0} values (1, 'one')".format(tbl_name2))
|
|
|
|
try:
|
|
admin_client.execute("grant all on table {0} to user {1}"
|
|
.format(tbl_name2, user))
|
|
result = self.execute_query_expect_success(
|
|
non_admin_client, "select count(*) from {0}".format(tbl_name2))
|
|
assert result.get_data() == "1"
|
|
|
|
# Migrates the table by admin and checks if the non-admin usert still has
|
|
# privileges.
|
|
self.execute_query_expect_success(
|
|
admin_client, "alter table {0} convert to iceberg".format(tbl_name2))
|
|
|
|
result = self.execute_query_expect_success(
|
|
non_admin_client, "select count(*) from {0}".format(tbl_name2))
|
|
assert result.get_data() == "1"
|
|
finally:
|
|
# Revoke privileges
|
|
admin_client.execute("revoke all on table {0} from user {1}"
|
|
.format(tbl_name2, user))
|
|
|
|
finally:
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_iceberg_metadata_table_privileges(self, unique_name):
|
|
user = getuser()
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
non_admin_client = self.create_impala_client(user=user)
|
|
short_table_name = "ice_1"
|
|
unique_database = unique_name + "_db"
|
|
tbl_name = unique_database + "." + short_table_name
|
|
|
|
try:
|
|
admin_client.execute("drop database if exists {0} cascade"
|
|
.format(unique_database))
|
|
admin_client.execute("create database {0}".format(unique_database))
|
|
admin_client.execute("create table {0} (a int) stored as iceberg"
|
|
.format(tbl_name))
|
|
|
|
# At this point, non-admin user without select privileges cannot query the metadata
|
|
# tables
|
|
result = self.execute_query_expect_failure(non_admin_client,
|
|
"select * from {0}.history".format(tbl_name))
|
|
assert "User '{0}' does not have privileges to execute 'SELECT' on: {1}".format(
|
|
user, unique_database) in str(result)
|
|
|
|
# Grant 'user' select privilege on the table
|
|
admin_client.execute("grant select on table {0} to user {1}".format(tbl_name, user))
|
|
result = non_admin_client.execute("select * from {0}.history".format(tbl_name))
|
|
assert result.success is True
|
|
|
|
finally:
|
|
admin_client.execute("revoke select on table {0} from user {1}"
|
|
.format(tbl_name, user))
|
|
admin_client.execute("drop database if exists {0} cascade".format(unique_database))
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_profile_protection(self):
|
|
"""Test that a requesting user is able to access the runtime profile or execution
|
|
summary of a query involving a view only if the user is granted the privileges on all
|
|
the underlying tables of the view. Recall that the view functional.complex_view we
|
|
use here is created based on the tables functional.alltypesagg and
|
|
functional.alltypestiny."""
|
|
grantee_user = "non_owner"
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
non_owner_client = self.create_impala_client(user=grantee_user)
|
|
test_db = "functional"
|
|
test_view = "complex_view"
|
|
try:
|
|
admin_client.execute(
|
|
"grant select on table {0}.{1} to user {2}"
|
|
.format(test_db, test_view, grantee_user))
|
|
|
|
admin_client.execute("refresh authorization")
|
|
# Recall that in a successful execution, result.exec_summary and
|
|
# result.runtime_profile store the execution summary and runtime profile,
|
|
# respectively. But when the requesting user does not have the privileges
|
|
# on the underlying tables, an exception will be thrown from
|
|
# ImpalaBeeswaxClient.get_runtime_profile().
|
|
result = self.execute_query_expect_failure(
|
|
non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view))
|
|
assert "User {0} is not authorized to access the runtime profile or " \
|
|
"execution summary".format(grantee_user) in str(result)
|
|
|
|
admin_client.execute(
|
|
"grant select on table {0}.alltypesagg to user {1}"
|
|
.format(test_db, grantee_user))
|
|
|
|
admin_client.execute("refresh authorization")
|
|
self.execute_query_expect_failure(
|
|
non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view))
|
|
assert "User {0} is not authorized to access the runtime profile or " \
|
|
"execution summary".format(grantee_user) in str(result)
|
|
|
|
admin_client.execute(
|
|
"grant select on table {0}.alltypestiny to user {1}"
|
|
.format(test_db, grantee_user))
|
|
|
|
admin_client.execute("refresh authorization")
|
|
self.execute_query_expect_success(
|
|
non_owner_client, "select count(*) from {0}.{1}".format(test_db, test_view))
|
|
finally:
|
|
cleanup_statements = [
|
|
"revoke select on table {0}.{1} from user {2}"
|
|
.format(test_db, test_view, grantee_user),
|
|
"revoke select on table {0}.alltypesagg from user {1}"
|
|
.format(test_db, grantee_user),
|
|
"revoke select on table {0}.alltypestiny from user {1}"
|
|
.format(test_db, grantee_user)
|
|
]
|
|
|
|
for statement in cleanup_statements:
|
|
admin_client.execute(statement)
|
|
|
|
|
|
class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite):
|
|
"""
|
|
Tests for Apache Ranger column masking policies on tpch nested tables.
|
|
"""
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
@classmethod
|
|
def get_workload(cls):
|
|
return 'tpch_nested'
|
|
|
|
@classmethod
|
|
def add_custom_cluster_constraints(cls):
|
|
# Do not call the super() implementation because this class needs to relax the
|
|
# set of constraints.
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: v.get_value('table_format').file_format == 'parquet')
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
|
|
def test_tpch_nested_column_masking(self, vector):
|
|
"""Test column masking on nested tables"""
|
|
user = getuser()
|
|
db = "tpch_nested_parquet"
|
|
# Mask PII columns: name, phone, address
|
|
tbl_cols = {
|
|
"customer": ["c_name", "c_phone", "c_address"],
|
|
"supplier": ["s_name", "s_phone", "s_address"],
|
|
"part": ["p_name"],
|
|
}
|
|
# Create another client for admin user since current user doesn't have privileges to
|
|
# create/drop databases or refresh authorization.
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
try:
|
|
for tbl in tbl_cols:
|
|
for col in tbl_cols[tbl]:
|
|
policy_name = "%s_%s_mask" % (tbl, col)
|
|
# Q22 requires showing the first 2 chars of the phone column.
|
|
mask_type = "MASK_SHOW_FIRST_4" if col.endswith("phone") else "MASK"
|
|
TestRanger._add_column_masking_policy(
|
|
policy_name, user, db, tbl, col, mask_type)
|
|
self.execute_query_expect_success(admin_client, "refresh authorization")
|
|
same_result_queries = ["q1", "q3", "q4", "q5", "q6", "q7", "q8", "q11", "q12",
|
|
"q13", "q14", "q16", "q17", "q19", "q22"]
|
|
result_masked_queries = ["q9", "q10", "q15", "q18", "q20", "q21", "q2"]
|
|
for q in same_result_queries:
|
|
self.run_test_case("tpch_nested-" + q, vector, use_db=db)
|
|
for q in result_masked_queries:
|
|
self.run_test_case("masked-tpch_nested-" + q, vector, use_db=db)
|
|
finally:
|
|
for tbl in tbl_cols:
|
|
for col in tbl_cols[tbl]:
|
|
policy_name = "%s_%s_mask" % (tbl, col)
|
|
TestRanger._remove_policy(policy_name)
|
|
|
|
|
|
class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite):
|
|
"""
|
|
Tests Ranger policies when complex types are given in the select list. The reason
|
|
this is a separate class is that directly querying complex types works only on HS2
|
|
while some tests in TestRanger needs Beeswax interface otherwise some of them fails.
|
|
"""
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestRangerColumnMaskingComplexTypesInSelectList, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
|
|
cls.ImpalaTestMatrix.add_dimension(create_orc_dimension(cls.get_workload()))
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('protocol') == 'hs2')
|
|
|
|
@classmethod
|
|
def add_custom_cluster_constraints(cls):
|
|
# Do not call the super() implementation, because this class needs to relax
|
|
# the set of constraints. The usual constraints only run on uncompressed text.
|
|
# This disables that constraint to let us run against only ORC.
|
|
return
|
|
|
|
@pytest.mark.execute_serially
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS)
|
|
def test_column_masking_with_structs_in_select_list(self, vector, unique_name):
|
|
user = getuser()
|
|
db = "functional_orc_def"
|
|
# Create another client for admin user since current user doesn't have privileges to
|
|
# create/drop databases or refresh authorization.
|
|
admin_client = self.create_impala_client(user=ADMIN)
|
|
policy_cnt = 0
|
|
try:
|
|
# Add a policy on a primitive column of a table which contains nested columns.
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_orc_def",
|
|
"complextypes_structs", "str", "MASK_NULL")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_orc_def",
|
|
"complextypes_structs", "tiny_struct", "MASK_NULL")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_orc_def",
|
|
"complextypestbl", "id", "MASK_NULL")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_orc_def",
|
|
"complextypestbl", "int_array_array", "MASK_NULL")
|
|
policy_cnt += 1
|
|
TestRanger._add_column_masking_policy(
|
|
unique_name + str(policy_cnt), user, "functional_orc_def",
|
|
"complextypestbl", "int_array_map", "MASK_NULL")
|
|
policy_cnt += 1
|
|
self.execute_query_expect_success(admin_client, "refresh authorization")
|
|
self.run_test_case("QueryTest/ranger_column_masking_struct_in_select_list", vector,
|
|
use_db=db)
|
|
finally:
|
|
for i in range(policy_cnt):
|
|
TestRanger._remove_policy(unique_name + str(i))
|
|
|
|
|
|
START_ARGS = 'start_args'
|
|
ICEBERG_REST_IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
|
|
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config """\
|
|
.format(os.environ['IMPALA_HOME']) + IMPALAD_ARGS
|
|
|
|
|
|
class ScopedPrivilege(object):
|
|
def __init__(self, user, resource, access):
|
|
self.user = user
|
|
self.resource = resource
|
|
self.access = access
|
|
|
|
def __enter__(self):
|
|
TestRanger._grant_ranger_privilege(self.user, self.resource, self.access)
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): # noqa: U100
|
|
TestRanger._revoke_ranger_privilege(self.user, self.resource, self.access)
|
|
|
|
|
|
class TestRangerIcebergRestCatalog(TestRanger):
|
|
"""
|
|
Tests for Apache Ranger policies on Iceberg tables in the REST Catalog.
|
|
"""
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
@classmethod
|
|
def need_default_clients(cls):
|
|
"""There will be no HMS, so we shouldn't create the Hive client."""
|
|
return False
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestRangerIcebergRestCatalog, cls).setup_class()
|
|
try:
|
|
cls.iceberg_rest_server = IcebergRestServer()
|
|
cls.iceberg_rest_server.start_rest_server(300)
|
|
except Exception as e:
|
|
cls.iceberg_rest_server.stop_rest_server(10)
|
|
raise e
|
|
|
|
try:
|
|
cls._stop_hive_service()
|
|
except Exception as e:
|
|
cls.cleanup_infra_services()
|
|
raise e
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
cls.cleanup_infra_services()
|
|
super(TestRangerIcebergRestCatalog, cls).teardown_class()
|
|
|
|
@classmethod
|
|
def cleanup_infra_services(cls):
|
|
cls.iceberg_rest_server.stop_rest_server(10)
|
|
cls._start_hive_service(None)
|
|
|
|
@classmethod
|
|
def add_custom_cluster_constraints(cls):
|
|
# Do not call the super() implementation because this class needs to relax the
|
|
# set of constraints.
|
|
cls.ImpalaTestMatrix.add_constraint(
|
|
lambda v: v.get_value('table_format').file_format == 'parquet')
|
|
|
|
def setup_method(self, method):
|
|
args = method.__dict__
|
|
if HIVE_CONF_DIR in args:
|
|
raise Exception("Cannot specify HIVE_CONF_DIR because the tests of this class are "
|
|
"running without Hive.")
|
|
# Invoke start-impala-cluster.py with '--no_catalogd'
|
|
start_args = "--no_catalogd"
|
|
if START_ARGS in args:
|
|
start_args = args[START_ARGS] + " " + start_args
|
|
args[START_ARGS] = start_args
|
|
|
|
super(TestRangerIcebergRestCatalog, self).setup_method(method)
|
|
# At this point we can create the Impala clients that we will need.
|
|
self.create_impala_clients()
|
|
self.admin_client = self.create_impala_client(user=ADMIN)
|
|
|
|
def _get_all_resource(self):
|
|
return {
|
|
"database": "*",
|
|
"column": "*",
|
|
"table": "*"
|
|
}
|
|
|
|
def _get_limited_resource(self):
|
|
return {
|
|
"database": "ice",
|
|
"column": "*",
|
|
"table": "airports_parquet"
|
|
}
|
|
|
|
def _get_access(self):
|
|
return ["select", "read"]
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=ICEBERG_REST_IMPALAD_ARGS)
|
|
def test_rest_catalog_basic(self, vector):
|
|
"""Run iceberg-rest-catalog.test with all the required privileges."""
|
|
with ScopedPrivilege(getuser(), self._get_all_resource(), self._get_access()):
|
|
self.admin_client.execute("refresh authorization")
|
|
self.run_test_case('QueryTest/iceberg-rest-catalog', vector, use_db="ice")
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
impalad_args=ICEBERG_REST_IMPALAD_ARGS)
|
|
def test_rest_catalog_fgac(self, vector):
|
|
"""Test that fine-grained access control work with Iceberg REST Catalog."""
|
|
with ScopedPrivilege(getuser(), self._get_limited_resource(), self._get_access()):
|
|
self.admin_client.execute("refresh authorization")
|
|
try:
|
|
self._add_column_masking_policy("column-masking-for-airports_parquet", getuser(),
|
|
"ice", "airports_parquet", "lat", "CUSTOM", "lat + 1000")
|
|
self._add_row_filtering_policy("row-filtering-for-airports_parquet", getuser(),
|
|
"ice", "airports_parquet", "country = 'USA'")
|
|
self.admin_client.execute("refresh authorization")
|
|
self.run_test_case('QueryTest/iceberg-rest-fgac', vector, use_db="ice")
|
|
finally:
|
|
self._remove_policy("column-masking-for-airports_parquet")
|
|
self._remove_policy("row-filtering-for-airports_parquet")
|
|
|
|
|
|
@CustomClusterTestSuite.with_args(
|
|
start_args="--env_vars=USE_CALCITE_PLANNER=true",
|
|
impalad_args=IMPALAD_ARGS,
|
|
catalogd_args=CATALOGD_ARGS)
|
|
class TestRangerWithCalcite(TestRanger):
|
|
"""
|
|
Tests for verifying the behavior of the Calcite planner with respect to
|
|
authorization via the Ranger server.
|
|
"""
|
|
|
|
def test_select_calcite_frontend(self, unique_name):
|
|
self._test_select_calcite_frontend(unique_name)
|
|
|
|
def test_view_on_view_all_configs(self, unique_name):
|
|
self._test_view_on_view_all_configs(unique_name)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_table_masking_calcite_frontend(self, unique_name):
|
|
self._test_table_masking_calcite_frontend(unique_name)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_cte(self):
|
|
"""
|
|
This verifies the Calcite planner won't treat CTEs as names of actual tables.
|
|
"""
|
|
with self.create_impala_client(user=ADMIN) as admin_client, \
|
|
self.create_impala_client(user=NON_OWNER) as non_owner_client:
|
|
# Set the query option of 'use_calcite_planner' to 1 to use the Calcite planner.
|
|
non_owner_client.set_configuration({"use_calcite_planner": 1})
|
|
database = "functional"
|
|
table_1 = database + "." + "alltypes"
|
|
table_2 = database + "." "alltypestiny"
|
|
test_query_1 = "with t as (select * from {0}) select * from t".format(table_1)
|
|
# A query that has a WITH clause involving more than one CTE.
|
|
test_query_2 = "with " \
|
|
"t1 as (select id from {0}), " \
|
|
"t2 as (select id from {1}) " \
|
|
"select * from t1, t2 where t1.id = t2.id".format(table_1, table_2)
|
|
# A query that has more than one WITH clause. A WITH clause is defined in the
|
|
# inline view 'v', and the outer CTE, i.e., 't1', is not referenced within 'v'.
|
|
test_query_3 = "with " \
|
|
"t1 as (select id from {0}) " \
|
|
"select v.id from " \
|
|
"(with t2 as (select id from {1}) select * from t2) v, t1 " \
|
|
"where v.id = t1.id".format(table_1, table_2)
|
|
# A query that has more than one WITH clause. A WITH clause is defined in the
|
|
# inline view 'v', and the outer CTE, i.e., 't1', is referenced within 'v'.
|
|
test_query_4 = "with " \
|
|
"t1 as (select id from {0} where id < 2) " \
|
|
"select v.id from " \
|
|
"(with t2 as (select id from {1}) select * from t2, t1 " \
|
|
"where t2.id = t1.id) v".format(table_1, table_2)
|
|
error_classic_fe = "AnalysisException: duplicated inline view column alias: " \
|
|
"'id' in inline view 'v'"
|
|
try:
|
|
admin_client.execute("grant select on table {0} to user {1}"
|
|
.format(table_1, NON_OWNER))
|
|
admin_client.execute("grant select on table {0} to user {1}"
|
|
.format(table_2, NON_OWNER))
|
|
non_owner_client.execute(test_query_1)
|
|
non_owner_client.execute(test_query_2)
|
|
non_owner_client.execute(test_query_3)
|
|
non_owner_client.execute(test_query_4)
|
|
|
|
# Set the query option of 'use_calcite_planner' to 0 to use the classic frontend.
|
|
non_owner_client.set_configuration({"use_calcite_planner": 0})
|
|
# Impala's classic frontend supports 'test_query_3'.
|
|
non_owner_client.execute(test_query_3)
|
|
# Impala's classic frontend could not support 'test_query_4'.
|
|
result = self.execute_query_expect_failure(non_owner_client, test_query_4)
|
|
assert error_classic_fe in str(result)
|
|
finally:
|
|
admin_client.execute("revoke select on table {0} from user {1}"
|
|
.format(table_1, NON_OWNER))
|
|
admin_client.execute("revoke select on table {0} from user {1}"
|
|
.format(table_2, NON_OWNER))
|