IMPALA-14017: Add Ranger tests to Iceberg REST Catalog

This patch adds authorization tests for the case when
Impala only connects to an Iceberg REST Catalog. To make
the tests faster it also implements REFRESH AUTHORIZATION
without CatalogD.

Testing:
 * custom cluster tests added with Ranger + Iceberg REST Catalog

Change-Id: I30d506e04537c5ca878ab9cf58792bc8a6b560c3
Reviewed-on: http://gerrit.cloudera.org:8080/23118
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Noemi Pap-Takacs <npaptakacs@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2025-05-21 13:11:00 +02:00
committed by Noemi Pap-Takacs
parent 41b6652fbf
commit eaadf7ada5
10 changed files with 333 additions and 34 deletions

View File

@@ -48,6 +48,7 @@ using namespace impala;
using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
DECLARE_bool(catalogd_deployed);
DECLARE_bool(use_local_catalog);
DECLARE_string(debug_actions);
@@ -106,6 +107,10 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
// Compute stats stmts must be executed via ExecComputeStats().
DCHECK(request.ddl_params.ddl_type != TDdlType::COMPUTE_STATS);
if (!FLAGS_catalogd_deployed) {
return Status("Operation is not supported without CatalogD.");
}
exec_response_.reset(new TDdlExecResponse());
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =
@@ -133,6 +138,17 @@ Status CatalogOpExecutor::Exec(const TCatalogOpRequest& request) {
return status;
}
case TCatalogOpType::RESET_METADATA: {
if (!FLAGS_catalogd_deployed) {
if (request.__isset.reset_metadata_params) {
const TResetMetadataRequest& metadata_params = request.reset_metadata_params;
if (metadata_params.authorization) {
return fe_->RefreshAuthorization();
}
}
// We don't cache metadata without CatalogD.
return Status::OK();
}
TResetMetadataResponse response;
int attempt = 0; // Used for debug action only.
CatalogServiceConnection::RpcStatus rpc_status =

View File

@@ -150,6 +150,7 @@ Frontend::Frontend() {
{"abortTransaction", "(J)V", &abort_txn_},
{"addTransaction", "([B)V", &add_txn_},
{"unregisterTransaction", "(J)V", &unregister_txn_},
{"refreshAuthorization", "()V", &refresh_authorization_},
{"getSaml2Redirect", "([B)[B", &get_saml2_redirect_id_},
{"validateSaml2Response", "([B)[B", &validate_saml2_response_id_},
{"validateSaml2Bearer", "([B)Ljava/lang/String;", &validate_saml2_bearer_id_},
@@ -376,6 +377,10 @@ Status Frontend::UnregisterTransaction(int64_t transaction_id) {
return JniUtil::CallJniMethod(fe_, unregister_txn_, transaction_id);
}
Status Frontend::RefreshAuthorization() {
return JniUtil::CallJniMethod(fe_, refresh_authorization_);
}
bool Frontend::IsAuthorizationError(const Status& status) {
return !status.ok() && status.GetDetail().find("AuthorizationException") == 0;
}

View File

@@ -206,6 +206,9 @@ class Frontend {
/// Unregisters an already committed transaction.
Status UnregisterTransaction(int64_t transaction_id);
/// Refreshes authorization.
Status RefreshAuthorization();
/// Returns true if the error returned by the FE was due to an AuthorizationException.
static bool IsAuthorizationError(const Status& status);
@@ -295,6 +298,7 @@ class Frontend {
jmethodID add_txn_; // JniFrontend.addTransaction()
jmethodID abort_txn_; // JniFrontend.abortTransaction()
jmethodID unregister_txn_; // JniFrontend.unregisterTransaction()
jmethodID refresh_authorization_; // JniFrontend.refreshAuthorization()
jmethodID get_saml2_redirect_id_; // JniFrontend.getSaml2Redirect()
jmethodID validate_saml2_response_id_; // JniFrontend.validateSaml2Response()
jmethodID validate_saml2_bearer_id_; // JniFrontend.validateSaml2Bearer()

View File

@@ -2408,6 +2408,7 @@ void ImpalaServer::WaitForMinCatalogUpdate(const int64_t min_req_catalog_object_
Status ImpalaServer::ProcessCatalogUpdateResult(
const TCatalogUpdateResult& catalog_update_result, bool wait_for_all_subscribers,
const TQueryOptions& query_options, RuntimeProfile::EventSequence* timeline) {
if (!FLAGS_catalogd_deployed) return Status::OK();
const TUniqueId& catalog_service_id = catalog_update_result.catalog_service_id;
if (!catalog_update_result.__isset.updated_catalog_objects &&
!catalog_update_result.__isset.removed_catalog_objects) {

View File

@@ -596,6 +596,10 @@ public class Frontend {
public AuthorizationChecker getAuthzChecker() { return authzChecker_.get(); }
public void refreshAuthorization() {
authzChecker_.get().invalidateAuthorizationCache();
}
public AuthorizationManager getAuthzManager() { return authzManager_; }
public ImpaladTableUsageTracker getImpaladTableUsageTracker() {

View File

@@ -669,6 +669,11 @@ public class JniFrontend {
frontend_.waitForCatalog();
}
public void refreshAuthorization() {
Preconditions.checkNotNull(frontend_);
frontend_.refreshAuthorization();
}
FeTable getCatalogTable(byte[] tableNameParam) throws ImpalaException {
Preconditions.checkNotNull(frontend_);
TTableName tableName = new TTableName();

View File

@@ -0,0 +1,119 @@
====
---- QUERY
SHOW DATABASES;
---- RESULTS
'ice',''
---- TYPES
STRING, STRING
====
---- QUERY
# User can only see table 'airports_parquet'.
SHOW TABLES;
---- RESULTS
'airports_parquet'
---- TYPES
STRING
====
---- QUERY
# 'lat' values have +1000 because of colum masking rule.
SELECT lat, lon, city, airport FROM airports_parquet WHERE iata = '00R';
---- RESULTS
1030.68586111,-95.01792777999999,'Livingston','Livingston Municipal'
---- TYPES
DOUBLE,DOUBLE,STRING,STRING
====
---- QUERY
# count(*) with row filtering
SELECT count(*) FROM airports_parquet;
---- RESULTS
3376
---- TYPES
BIGINT
====
---- QUERY
# User can only see country = 'USA' due to row filtering.
SELECT distinct country FROM airports_parquet;
---- RESULTS
'USA'
---- TYPES
STRING
====
---- QUERY
DESCRIBE ice.airports_parquet
---- RESULTS
'iata','string','','true'
'airport','string','','true'
'city','string','','true'
'state','double','','true'
'country','string','','true'
'lat','double','','true'
'lon','double','','true'
---- TYPES
STRING, STRING, STRING, STRING
====
---- QUERY
DESCRIBE FORMATTED ice.airports_parquet;
---- RESULTS: VERIFY_IS_SUBSET
'# col_name ','data_type ','comment '
'','NULL','NULL'
'iata','string','NULL'
'airport','string','NULL'
'city','string','NULL'
'state','double','NULL'
'country','string','NULL'
'lat','double','NULL'
'lon','double','NULL'
'','NULL','NULL'
'# Detailed Table Information','NULL','NULL'
'Database: ','ice ','NULL'
'OwnerType: ','USER ','NULL'
'Owner: ','null ','NULL'
'Location: ','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/airports_parquet','NULL'
row_regex:'Erasure Coding Policy:','$ERASURECODE_POLICY *','NULL'
'Table Type: ','EXTERNAL_TABLE ','NULL'
'Table Parameters:','NULL','NULL'
'','EXTERNAL ','TRUE '
'','bucketing_version ','2 '
'','engine.hive.enabled ','true '
'','gc.enabled ','TRUE '
'','numFiles ','1 '
'','storage_handler ','org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
'','table_type ','ICEBERG '
'','write.format.default','parquet '
'','NULL','NULL'
'# Storage Information','NULL','NULL'
'SerDe Library: ','org.apache.iceberg.mr.hive.HiveIcebergSerDe','NULL'
'InputFormat: ','org.apache.iceberg.mr.hive.HiveIcebergInputFormat','NULL'
'OutputFormat: ','org.apache.iceberg.mr.hive.HiveIcebergOutputFormat','NULL'
'Compressed: ','No ','NULL'
'Sort Columns: ','[] ','NULL'
'','NULL','NULL'
'# Constraints','NULL','NULL'
---- TYPES
string, string, string
====
---- QUERY
SELECT * FROM airports_orc;
---- CATCH
does not have privileges to execute 'SELECT'
====
---- QUERY
SELECT * FROM ice.airports_orc.history;
---- CATCH
does not have privileges to execute 'SELECT'
====
---- QUERY
DESCRIBE ice.airports_orc;
---- CATCH
does not have privileges to access: ice.airports_orc.*
====
---- QUERY
DESCRIBE FORMATTED ice.airports_orc;
---- CATCH
does not have privileges to access: ice.airports_orc.*
====
---- QUERY
show table stats ice.airports_orc;
---- CATCH
does not have privileges to access: ice.airports_orc
====

View File

@@ -33,6 +33,7 @@ import requests
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.file_utils import copy_files_to_hdfs_dir
from tests.common.iceberg_rest_server import IcebergRestServer
from tests.common.skip import SkipIf, SkipIfFS, SkipIfHive2
from tests.common.test_dimensions import (
create_client_protocol_dimension,
@@ -3231,3 +3232,102 @@ class TestRangerColumnMaskingComplexTypesInSelectList(CustomClusterTestSuite):
finally:
for i in range(policy_cnt):
TestRanger._remove_policy(unique_name + str(i))
START_ARGS = 'start_args'
ICEBERG_REST_IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
--catalog_config_dir={}/testdata/configs/catalog_configs/iceberg_rest_config """\
.format(os.environ['IMPALA_HOME']) + IMPALAD_ARGS
class ScopedPrivilege(object):
def __init__(self, user, resource, access):
self.user = user
self.resource = resource
self.access = access
def __enter__(self):
TestRanger._grant_ranger_privilege(self.user, self.resource, self.access)
def __exit__(self, exc_type, exc_val, exc_tb): # noqa: U100
TestRanger._revoke_ranger_privilege(self.user, self.resource, self.access)
class TestRangerIcebergRestCatalog(TestRanger):
"""
Tests for Apache Ranger policies on Iceberg tables in the REST Catalog.
"""
@classmethod
def default_test_protocol(cls):
return HS2
@classmethod
def add_custom_cluster_constraints(cls):
# Do not call the super() implementation because this class needs to relax the
# set of constraints.
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format == 'parquet')
def setup_method(self, method):
# Invoke start-impala-cluster.py with '--no_catalogd'
start_args = "--no_catalogd"
if START_ARGS in method.__dict__:
start_args = method.__dict__[START_ARGS] + " " + start_args
method.__dict__[START_ARGS] = start_args
try:
self.iceberg_rest_server = IcebergRestServer()
self.iceberg_rest_server.start_rest_server(300)
super(TestRangerIcebergRestCatalog, self).setup_method(method)
self.admin_client = self.create_impala_client(user=ADMIN)
except Exception as e:
print(e)
self.iceberg_rest_server.stop_rest_server(10)
raise e
def teardown_method(self, method):
self.iceberg_rest_server.stop_rest_server(10)
super(TestRangerIcebergRestCatalog, self).teardown_method(method)
def _get_all_resource(self):
return {
"database": "*",
"column": "*",
"table": "*"
}
def _get_limited_resource(self):
return {
"database": "ice",
"column": "*",
"table": "airports_parquet"
}
def _get_access(self):
return ["select", "read"]
@CustomClusterTestSuite.with_args(
impalad_args=ICEBERG_REST_IMPALAD_ARGS)
def test_rest_catalog_basic(self, vector):
"""Run iceberg-rest-catalog.test with all the required privileges."""
with ScopedPrivilege(getuser(), self._get_all_resource(), self._get_access()):
self.admin_client.execute("refresh authorization")
self.run_test_case('QueryTest/iceberg-rest-catalog', vector, use_db="ice")
@CustomClusterTestSuite.with_args(
impalad_args=ICEBERG_REST_IMPALAD_ARGS)
def test_rest_catalog_fgac(self, vector):
"""Test that fine-grained access control work with Iceberg REST Catalog."""
with ScopedPrivilege(getuser(), self._get_limited_resource(), self._get_access()):
self.admin_client.execute("refresh authorization")
try:
self._add_column_masking_policy("column-masking-for-airports_parquet", getuser(),
"ice", "airports_parquet", "lat", "CUSTOM", "lat + 1000")
self._add_row_filtering_policy("row-filtering-for-airports_parquet", getuser(),
"ice", "airports_parquet", "country = 'USA'")
self.admin_client.execute("refresh authorization")
self.run_test_case('QueryTest/iceberg-rest-fgac', vector, use_db="ice")
finally:
self._remove_policy("column-masking-for-airports_parquet")
self._remove_policy("row-filtering-for-airports_parquet")

View File

@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import logging
import os
import subprocess
import signal
import socket
import sys
import time
REST_SERVER_PORT = 9084
IMPALA_HOME = os.environ['IMPALA_HOME']
LOG = logging.getLogger('impala_test_suite')
class IcebergRestServer(object):
"""
Utility class for starting and stopping our minimal Iceberg REST server.
"""
def start_rest_server(self, timeout_s):
self.process = subprocess.Popen('testdata/bin/run-iceberg-rest-server.sh',
stdout=sys.stdout, stderr=sys.stderr, shell=True,
preexec_fn=os.setsid, cwd=IMPALA_HOME)
self._wait_for_rest_server_to_start(timeout_s)
def stop_rest_server(self, timeout_s):
if self.process:
os.killpg(self.process.pid, signal.SIGTERM)
self._wait_for_rest_server_to_be_killed(timeout_s)
def _wait_for_rest_server_to_start(self, timeout_s):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex(('localhost', REST_SERVER_PORT)) == 0:
LOG.info("Iceberg REST server is available.")
return
s.close()
time.sleep(sleep_interval_s)
raise Exception(
"Webserver did not become available within {} seconds.".format(timeout_s))
def _wait_for_rest_server_to_be_killed(self, timeout_s):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if s.connect_ex(('localhost', REST_SERVER_PORT)) != 0:
LOG.info("Iceberg REST server has stopped.")
return
s.close()
time.sleep(sleep_interval_s)
# Let's not throw an exception as this is typically invoked during cleanup, and we
# want the rest of the cleanup code to be executed.
LOG.info("Iceberg REST server hasn't stopped in time.")

View File

@@ -17,14 +17,10 @@
from __future__ import absolute_import, division, print_function
import os
import socket
import time
import pytest
import signal
import subprocess
import sys
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.iceberg_rest_server import IcebergRestServer
REST_SERVER_PORT = 9084
IMPALA_HOME = os.environ['IMPALA_HOME']
@@ -35,6 +31,7 @@ IMPALAD_ARGS = """--use_local_catalog=true --catalogd_deployed=false
class TestIcebergRestCatalog(CustomClusterTestSuite):
"""Test suite for Iceberg REST Catalog."""
def setup_method(self, method):
# Invoke start-impala-cluster.py with '--no_catalogd'
@@ -44,42 +41,17 @@ class TestIcebergRestCatalog(CustomClusterTestSuite):
method.__dict__[START_ARGS] = start_args
try:
self._start_rest_server()
self._wait_for_rest_server(300)
self.iceberg_rest_server = IcebergRestServer()
self.iceberg_rest_server.start_rest_server(300)
super(TestIcebergRestCatalog, self).setup_method(method)
except Exception as e:
self._stop_rest_server()
self.iceberg_rest_server.stop_rest_server(10)
raise e
def teardown_method(self, method):
self._stop_rest_server()
self.iceberg_rest_server.stop_rest_server(10)
super(TestIcebergRestCatalog, self).teardown_method(method)
def _start_rest_server(self):
self.process = subprocess.Popen(
'testdata/bin/run-iceberg-rest-server.sh',
stdout=sys.stdout, stderr=sys.stderr, shell=True,
preexec_fn=os.setsid, cwd=IMPALA_HOME)
def _stop_rest_server(self):
if self.process:
os.killpg(self.process.pid, signal.SIGTERM)
def _wait_for_rest_server(self, timeout_s):
sleep_interval_s = 0.5
start_time = time.time()
while time.time() - start_time < timeout_s:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
if s.connect_ex(('localhost', REST_SERVER_PORT)) == 0:
print("Iceberg REST server is available.")
return
finally:
s.close()
time.sleep(sleep_interval_s)
raise Exception(
"Webserver did not become available within {} seconds.".format(timeout_s))
@CustomClusterTestSuite.with_args(
impalad_args=IMPALAD_ARGS)
@pytest.mark.execute_serially