mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-10876: Support to download JWKS from given URL
This patch added functionality to download JWKS from a given URL and support key rotation by periodically checking the JWKS URL for updates. We use Kudu's EasyCurl wrapper to download file from the given URL. curl was added to native-toolchain. This patch modified makefiles and bootstrap_toolchain.py to integrate libcurl and libkudu_curl_util. Added end-end JWT authentication test cases with JWKS specified as HTTP/HTTPS URL. Testing: - Passed core run, including new test cases. Change-Id: Ic6ac8cf0010c13db30219776d1d275709bf211df Reviewed-on: http://gerrit.cloudera.org:8080/17802 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
d7068ace15
commit
03a7a59f5d
@@ -118,6 +118,7 @@ set_dep_root(SNAPPY)
|
||||
set_dep_root(THRIFT)
|
||||
set_dep_root(ZLIB)
|
||||
set_dep_root(CCTZ)
|
||||
set_dep_root(CURL)
|
||||
|
||||
# The boost-cmake project hasn't been maintained for years. Let's make sure we
|
||||
# don't accidentally use it if it can be found.
|
||||
@@ -347,6 +348,10 @@ IMPALA_ADD_THIRDPARTY_LIB(krb5 ${KERBEROS_INCLUDE_DIR} "" ${KERBEROS_LIBRARY})
|
||||
# testing.
|
||||
find_package(KerberosPrograms REQUIRED)
|
||||
|
||||
# find curl headers and libs
|
||||
find_package(Curl REQUIRED)
|
||||
IMPALA_ADD_THIRDPARTY_LIB(curl ${CURL_INCLUDE_DIR} ${CURL_STATIC_LIB} "")
|
||||
|
||||
# Tests that run any security related tests need to link this in to override the
|
||||
# krb5_realm_override() implementation in krb5.
|
||||
# See be/src/kudu/security/krb5_realm_override.cc for more information.
|
||||
|
||||
@@ -356,6 +356,7 @@ set(CLANG_INCLUDE_FLAGS
|
||||
"-I${LDAP_INCLUDE_DIR}"
|
||||
"-I${PROTOBUF_INCLUDE_DIR}"
|
||||
"-I${CCTZ_INCLUDE_DIR}"
|
||||
"-I${CURL_INCLUDE_DIR}"
|
||||
)
|
||||
|
||||
# allow linking of static libs into dynamic lib
|
||||
@@ -424,6 +425,7 @@ set (IMPALA_LIBS
|
||||
histogram_proto
|
||||
ImpalaThrift
|
||||
Io
|
||||
kudu_curl_util
|
||||
kudu_util
|
||||
krpc
|
||||
Rpc
|
||||
@@ -545,7 +547,8 @@ set (IMPALA_DEPENDENCIES
|
||||
orc
|
||||
java_jvm
|
||||
kudu_client
|
||||
cctz)
|
||||
cctz
|
||||
curl)
|
||||
|
||||
# Add all external dependencies. They should come after the impala libs.
|
||||
set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS}
|
||||
|
||||
@@ -363,15 +363,13 @@ target_compile_definitions(kudu_util_compression PUBLIC LZ4_DISABLE_DEPRECATE_WA
|
||||
#######################################
|
||||
# kudu_curl_util
|
||||
#######################################
|
||||
# Impala doesn't have curl in its toolchain. It relies instead on the system curl, which
|
||||
# is too old on some OSes we compile on and doesn't include macros Kudu needs.
|
||||
#add_library(kudu_curl_util
|
||||
# curl_util.cc)
|
||||
#target_link_libraries(kudu_curl_util
|
||||
# security
|
||||
# ${CURL_LIBRARIES}
|
||||
# glog
|
||||
# gutil)
|
||||
add_library(kudu_curl_util
|
||||
curl_util.cc)
|
||||
target_link_libraries(kudu_curl_util
|
||||
security
|
||||
curl
|
||||
glog
|
||||
gutil)
|
||||
|
||||
#######################################
|
||||
# kudu_cloud_util
|
||||
@@ -607,11 +605,11 @@ endif()
|
||||
#######################################
|
||||
# curl_util-test
|
||||
#######################################
|
||||
#ADD_KUDU_TEST(curl_util-test)
|
||||
#if(NOT NO_TESTS)
|
||||
# target_link_libraries(curl_util-test
|
||||
# kudu_curl_util)
|
||||
#endif()
|
||||
ADD_KUDU_TEST(curl_util-test)
|
||||
if(NOT NO_TESTS)
|
||||
target_link_libraries(curl_util-test
|
||||
kudu_curl_util)
|
||||
endif()
|
||||
|
||||
#######################################
|
||||
# instance_detector-test
|
||||
|
||||
@@ -164,6 +164,14 @@ DEFINE_bool(jwt_validate_signature, true,
|
||||
// validate signatures. It represents cryptographic keys in JSON data structure.
|
||||
DEFINE_string(jwks_file_path, "",
|
||||
"File path of the pre-installed JSON Web Key Set (JWKS) for JWT verification");
|
||||
// This specifies the URL for JWKS to be downloaded.
|
||||
DEFINE_string(jwks_url, "", "URL of the JSON Web Key Set (JWKS) for JWT verification");
|
||||
DEFINE_int32(jwks_update_frequency_s, 60,
|
||||
"(Advanced) The time in seconds to wait between downloading JWKS from the specified "
|
||||
"URL.");
|
||||
DEFINE_int32(jwks_pulling_timeout_s, 10,
|
||||
"(Advanced) The time in seconds for connection timed out when pulling JWKS from the "
|
||||
"specified URL.");
|
||||
// This specifies the custom claim in the JWT that contains the "username" for the
|
||||
// session.
|
||||
DEFINE_string(jwt_custom_claim_username, "username", "Custom claim 'username'");
|
||||
|
||||
@@ -348,6 +348,7 @@ DEFINE_int32(admission_heartbeat_frequency_ms, 1000,
|
||||
DECLARE_bool(jwt_token_auth);
|
||||
DECLARE_bool(jwt_validate_signature);
|
||||
DECLARE_string(jwks_file_path);
|
||||
DECLARE_string(jwks_url);
|
||||
|
||||
namespace {
|
||||
using namespace impala;
|
||||
@@ -2912,7 +2913,10 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
|
||||
// Load JWKS from file if validation for signature of JWT token is enabled.
|
||||
if (FLAGS_jwt_token_auth && FLAGS_jwt_validate_signature) {
|
||||
if (!FLAGS_jwks_file_path.empty()) {
|
||||
RETURN_IF_ERROR(JWTHelper::GetInstance()->Init(FLAGS_jwks_file_path));
|
||||
RETURN_IF_ERROR(JWTHelper::GetInstance()->Init(FLAGS_jwks_file_path, true));
|
||||
} else if (!FLAGS_jwks_url.empty()) {
|
||||
if (TestInfo::is_test()) sleep(1);
|
||||
RETURN_IF_ERROR(JWTHelper::GetInstance()->Init(FLAGS_jwks_url, false));
|
||||
} else {
|
||||
LOG(ERROR) << "JWKS file is not specified when the validation of JWT signature "
|
||||
<< " is enabled.";
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
#ifndef IMPALA_JWT_UTIL_INTERNAL_H
|
||||
#define IMPALA_JWT_UTIL_INTERNAL_H
|
||||
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -35,6 +36,7 @@
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/status.h"
|
||||
#include "util/thread.h"
|
||||
|
||||
namespace impala {
|
||||
|
||||
@@ -238,17 +240,16 @@ class ECJWTPublicKeyBuilder {
|
||||
const std::string& base64_y, std::string& pub_key);
|
||||
};
|
||||
|
||||
/// JSON Web Key Set (JWKS) conveys the public keys used by the signing party to the
|
||||
/// clients that need to validate signatures. It represents a cryptographic key set in
|
||||
/// JSON data structure.
|
||||
/// This class works as JWT provider, which load the JWKS from file, store keys in an
|
||||
/// internal maps for each family of algorithms, and provides API to retrieve key by
|
||||
/// key-id.
|
||||
/// Init() should be called during the initialization of the daemon. There is no
|
||||
/// more modification for the instance after Init() return. The class is thread safe.
|
||||
class JsonWebKeySet {
|
||||
/// This class load the JWKS from file or URL, store keys in an internal maps for each
|
||||
/// family of algorithms, and provides API to retrieve key by key-id.
|
||||
/// It's a snapshot of the current JWKS. The JWKSMgr maintains a consistent copy of this
|
||||
/// and updates it atomically when the public keys in JWKS are changed. Clients can obtain
|
||||
/// an immutable copy. Class instances can be created through the implicitly-defined
|
||||
/// default and copy constructors.
|
||||
class JWKSSnapshot {
|
||||
public:
|
||||
explicit JsonWebKeySet() {}
|
||||
JWKSSnapshot() = default;
|
||||
JWKSSnapshot(const JWKSSnapshot&) = default;
|
||||
|
||||
/// Map from a key ID (kid) to a JWTPublicKey.
|
||||
typedef std::unordered_map<std::string, std::unique_ptr<JWTPublicKey>> JWTPublicKeyMap;
|
||||
@@ -256,7 +257,12 @@ class JsonWebKeySet {
|
||||
/// Load JWKS stored in a JSON file. Returns an error if problems were encountered
|
||||
/// while parsing/constructing the Json Web keys. If no keys were given in the file,
|
||||
/// the internal maps will be empty.
|
||||
Status Init(const std::string& jwks_file_path);
|
||||
Status LoadKeysFromFile(const std::string& jwks_file_path);
|
||||
/// Download JWKS JSON file from the given URL, then load the public keys if the
|
||||
/// checksum of JWKS object is changed. If no keys were given in the URL, the internal
|
||||
/// maps will be empty.
|
||||
Status LoadKeysFromUrl(
|
||||
const std::string& jwks_url, uint64_t cur_jwks_hash, bool* is_changed);
|
||||
|
||||
/// Look up the key ID in the internal key maps and returns the key if the lookup was
|
||||
/// successful, otherwise return nullptr.
|
||||
@@ -283,6 +289,8 @@ class JsonWebKeySet {
|
||||
return hs_key_map_.empty() && rsa_pub_key_map_.empty() && ec_pub_key_map_.empty();
|
||||
}
|
||||
|
||||
uint64_t GetChecksum() const { return jwks_checksum_; }
|
||||
|
||||
private:
|
||||
friend class JWKSetParser;
|
||||
|
||||
@@ -308,6 +316,64 @@ class JsonWebKeySet {
|
||||
/// Public keys for EC family of algorithms: ES256, ES384, ES512.
|
||||
/// kty (key type): EC.
|
||||
JWTPublicKeyMap ec_pub_key_map_;
|
||||
|
||||
/// 64 bit checksum of JWKS object.
|
||||
/// This variable is only used when downloading JWKS from the given URL.
|
||||
uint64_t jwks_checksum_ = 0;
|
||||
};
|
||||
|
||||
/// An immutable shared JWKS snapshot.
|
||||
typedef std::shared_ptr<const JWKSSnapshot> JWKSSnapshotPtr;
|
||||
|
||||
/// JSON Web Key Set (JWKS) conveys the public keys used by the signing party to the
|
||||
/// clients that need to validate signatures. It represents a cryptographic key set in
|
||||
/// JSON data structure.
|
||||
/// This class works as JWKS manager, which load the JWKS from local file or URL.
|
||||
/// Init() should be called during the initialization of the daemon.
|
||||
/// The class is thread safe.
|
||||
class JWKSMgr {
|
||||
public:
|
||||
explicit JWKSMgr() {}
|
||||
|
||||
/// Destructor is only called for backend tests
|
||||
~JWKSMgr();
|
||||
|
||||
/// Load JWKS stored in a JSON file. Returns an error if problems were encountered
|
||||
/// while parsing/constructing the Json Web keys. If no keys were given in the file,
|
||||
/// the internal maps will be empty.
|
||||
/// If the given jwks_uri is a URL, start a working thread which will periodically
|
||||
/// checks the JWKS URL for updates. This provides support for key rotation.
|
||||
Status Init(const std::string& jwks_uri, bool is_local_file);
|
||||
|
||||
/// Returns a read only snapshot of the current JWKS. This function should be called
|
||||
/// after calling Init().
|
||||
JWKSSnapshotPtr GetJWKSSnapshot() const;
|
||||
|
||||
private:
|
||||
/// Atomically replaces a JWKS snapshot with a new copy.
|
||||
void SetJWKSSnapshot(const JWKSSnapshotPtr& new_jwks);
|
||||
|
||||
/// Helper function for working thread which periodically checks the JWKS URL for
|
||||
/// updates.
|
||||
void UpdateJWKSThread();
|
||||
|
||||
/// Thread that runs UpdateJWKSThread(). This thread will exit when the
|
||||
/// shut_down_promise_ is set.
|
||||
std::unique_ptr<Thread> jwks_update_thread_;
|
||||
Promise<bool> shut_down_promise_;
|
||||
|
||||
/// JWKS URI.
|
||||
std::string jwks_uri_;
|
||||
|
||||
/// The snapshot of the current JWKS. When the checksum of downloaded JWKS json object
|
||||
/// has been changed, the public keys will be reloaded and the content of this pointer
|
||||
/// will be atomically swapped.
|
||||
JWKSSnapshotPtr current_jwks_;
|
||||
/// 64 bit checksum of current JWKS object.
|
||||
uint64_t current_jwks_checksum_ = 0;
|
||||
|
||||
/// Protects current_jwks_.
|
||||
mutable std::mutex current_jwks_lock_;
|
||||
};
|
||||
|
||||
} // namespace impala
|
||||
|
||||
@@ -405,9 +405,9 @@ TEST(JwtUtilTest, LoadJwksFile) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_FALSE(jwks->IsEmpty());
|
||||
ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
|
||||
|
||||
@@ -436,13 +436,12 @@ TEST(JwtUtilTest, LoadInvalidJwksFiles) {
|
||||
" ]"
|
||||
"}"));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file->Filename());
|
||||
Status status = jwt_helper.Init(jwks_file->Filename(), true);
|
||||
ASSERT_FALSE(status.ok());
|
||||
ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
|
||||
<< " Actual error: " << status.msg().msg();
|
||||
ASSERT_TRUE(status.GetDetail().find("'kid' property is required") != std::string::npos)
|
||||
<< "actual error: " << status.GetDetail();
|
||||
ASSERT_TRUE(jwt_helper.GetJWKS()->IsEmpty());
|
||||
|
||||
// Invalid JSON format, missing "]" and "}".
|
||||
jwks_file.reset(new TempTestDataFile(
|
||||
@@ -456,7 +455,7 @@ TEST(JwtUtilTest, LoadInvalidJwksFiles) {
|
||||
" \"n\": \"sttddbg-_yjXzcFpbMJB1fIFam9lQBeXWbTqzJwbuFbspHMsRowa8FaPw\","
|
||||
" \"e\": \"AQAB\""
|
||||
"}"));
|
||||
status = jwt_helper.Init(jwks_file->Filename());
|
||||
status = jwt_helper.Init(jwks_file->Filename(), true);
|
||||
ASSERT_FALSE(status.ok());
|
||||
ASSERT_TRUE(status.GetDetail().find("Missing a comma or ']' after an array element")
|
||||
!= std::string::npos)
|
||||
@@ -466,7 +465,7 @@ TEST(JwtUtilTest, LoadInvalidJwksFiles) {
|
||||
jwks_file.reset(new TempTestDataFile(
|
||||
Substitute(jwks_rsa_file_format, "", "RS256", rsa_pub_key_jwk_n, rsa_pub_key_jwk_e,
|
||||
"", "RS256", rsa_invalid_pub_key_jwk_n, rsa_pub_key_jwk_e)));
|
||||
status = jwt_helper.Init(jwks_file->Filename());
|
||||
status = jwt_helper.Init(jwks_file->Filename(), true);
|
||||
ASSERT_FALSE(status.ok());
|
||||
ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
|
||||
<< " Actual error: " << status.msg().msg();
|
||||
@@ -477,7 +476,7 @@ TEST(JwtUtilTest, LoadInvalidJwksFiles) {
|
||||
// JWKS with empty key value.
|
||||
jwks_file.reset(new TempTestDataFile(
|
||||
Substitute(jwks_rsa_file_format, kid_1, "RS256", "", "", kid_2, "RS256", "", "")));
|
||||
status = jwt_helper.Init(jwks_file->Filename());
|
||||
status = jwt_helper.Init(jwks_file->Filename(), true);
|
||||
ASSERT_FALSE(status.ok());
|
||||
ASSERT_TRUE(status.msg().msg().find("parsing key #0") != std::string::npos)
|
||||
<< " Actual error: " << status.msg().msg();
|
||||
@@ -493,9 +492,9 @@ TEST(JwtUtilTest, VerifyJwtHS256) {
|
||||
TempTestDataFile jwks_file(
|
||||
Substitute(jwks_hs_file_format, kid_1, "HS256", shared_secret));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
EXPECT_OK(status);
|
||||
ASSERT_EQ(1, jwks->GetHSKeyNum());
|
||||
|
||||
@@ -533,9 +532,9 @@ TEST(JwtUtilTest, VerifyJwtHS384) {
|
||||
TempTestDataFile jwks_file(
|
||||
Substitute(jwks_hs_file_format, kid_1, "HS384", shared_secret));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
EXPECT_OK(status);
|
||||
ASSERT_EQ(1, jwks->GetHSKeyNum());
|
||||
|
||||
@@ -573,9 +572,9 @@ TEST(JwtUtilTest, VerifyJwtHS512) {
|
||||
TempTestDataFile jwks_file(
|
||||
Substitute(jwks_hs_file_format, kid_1, "HS512", shared_secret));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
EXPECT_OK(status);
|
||||
ASSERT_EQ(1, jwks->GetHSKeyNum());
|
||||
|
||||
@@ -611,9 +610,9 @@ TEST(JwtUtilTest, VerifyJwtRS256) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
|
||||
@@ -665,9 +664,9 @@ TEST(JwtUtilTest, VerifyJwtRS384) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS384", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
|
||||
@@ -703,9 +702,9 @@ TEST(JwtUtilTest, VerifyJwtRS512) {
|
||||
rsa512_pub_key_jwk_n, rsa512_pub_key_jwk_e, kid_2, "RS512",
|
||||
rsa512_invalid_pub_key_jwk_n, rsa512_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
|
||||
@@ -741,9 +740,9 @@ TEST(JwtUtilTest, VerifyJwtPS256) {
|
||||
rsa1024_pub_key_jwk_n, rsa1024_pub_key_jwk_e, kid_2, "PS256",
|
||||
rsa_invalid_pub_key_jwk_n, rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
|
||||
@@ -779,9 +778,9 @@ TEST(JwtUtilTest, VerifyJwtPS384) {
|
||||
rsa2048_pub_key_jwk_n, rsa2048_pub_key_jwk_e, kid_2, "PS384",
|
||||
rsa_invalid_pub_key_jwk_n, rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
|
||||
@@ -817,9 +816,9 @@ TEST(JwtUtilTest, VerifyJwtPS512) {
|
||||
rsa4096_pub_key_jwk_n, rsa4096_pub_key_jwk_e, kid_2, "PS512",
|
||||
rsa_invalid_pub_key_jwk_n, rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(2, jwks->GetRSAPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupRSAPublicKey(kid_1);
|
||||
@@ -854,9 +853,9 @@ TEST(JwtUtilTest, VerifyJwtES256) {
|
||||
TempTestDataFile jwks_file(Substitute(jwks_ec_file_format, kid_1, "P-256",
|
||||
ecdsa256_pub_key_jwk_x, ecdsa256_pub_key_jwk_y));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(1, jwks->GetECPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupECPublicKey(kid_1);
|
||||
@@ -899,9 +898,9 @@ TEST(JwtUtilTest, VerifyJwtES384) {
|
||||
TempTestDataFile jwks_file(Substitute(jwks_ec_file_format, kid_1, "P-384",
|
||||
ecdsa384_pub_key_jwk_x, ecdsa384_pub_key_jwk_y));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(1, jwks->GetECPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupECPublicKey(kid_1);
|
||||
@@ -936,9 +935,9 @@ TEST(JwtUtilTest, VerifyJwtES512) {
|
||||
TempTestDataFile jwks_file(Substitute(jwks_ec_file_format, kid_1, "P-521",
|
||||
ecdsa521_pub_key_jwk_x, ecdsa521_pub_key_jwk_y));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
const JsonWebKeySet* jwks = jwt_helper.GetJWKS();
|
||||
JWKSSnapshotPtr jwks = jwt_helper.GetJWKS();
|
||||
ASSERT_EQ(1, jwks->GetECPublicKeyNum());
|
||||
|
||||
const JWTPublicKey* key1 = jwks->LookupECPublicKey(kid_1);
|
||||
@@ -994,7 +993,7 @@ TEST(JwtUtilTest, VerifyJwtFailMismatchingAlgorithms) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
|
||||
// Create a JWT token, but set mismatching algorithm.
|
||||
@@ -1023,7 +1022,7 @@ TEST(JwtUtilTest, VerifyJwtFailKeyNotFound) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
|
||||
// Create a JWT token with a key ID which can not be found in JWKS.
|
||||
@@ -1051,7 +1050,7 @@ TEST(JwtUtilTest, VerifyJwtTokenWithoutKeyId) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
|
||||
// Create a JWT token without key ID.
|
||||
@@ -1072,7 +1071,7 @@ TEST(JwtUtilTest, VerifyJwtFailTokenWithoutKeyId) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
|
||||
// Create a JWT token without key ID.
|
||||
@@ -1092,7 +1091,7 @@ TEST(JwtUtilTest, VerifyJwtFailTokenWithoutSignature) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
|
||||
// Create a JWT token without signature.
|
||||
@@ -1114,7 +1113,7 @@ TEST(JwtUtilTest, VerifyJwtFailExpiredToken) {
|
||||
rsa_pub_key_jwk_n, rsa_pub_key_jwk_e, kid_2, "RS256", rsa_invalid_pub_key_jwk_n,
|
||||
rsa_pub_key_jwk_e));
|
||||
JWTHelper jwt_helper;
|
||||
Status status = jwt_helper.Init(jwks_file.Filename());
|
||||
Status status = jwt_helper.Init(jwks_file.Filename(), true);
|
||||
EXPECT_OK(status);
|
||||
|
||||
// Create a JWT token and sign it with RS256.
|
||||
|
||||
@@ -35,8 +35,17 @@
|
||||
#include <rapidjson/filereadstream.h>
|
||||
|
||||
#include "common/names.h"
|
||||
#include "hash-util.h"
|
||||
#include "jwt-util-internal.h"
|
||||
#include "jwt-util.h"
|
||||
#include "kudu/util/curl_util.h"
|
||||
#include "kudu/util/faststring.h"
|
||||
#include "kudu/util/monotime.h"
|
||||
#include "util/kudu-status-util.h"
|
||||
#include "util/test-info.h"
|
||||
|
||||
DECLARE_int32(jwks_update_frequency_s);
|
||||
DECLARE_int32(jwks_pulling_timeout_s);
|
||||
|
||||
namespace impala {
|
||||
|
||||
@@ -47,7 +56,7 @@ using rapidjson::Value;
|
||||
// This class parses JWKS file.
|
||||
class JWKSetParser {
|
||||
public:
|
||||
JWKSetParser(JsonWebKeySet* jwks) : jwks_(jwks) {}
|
||||
JWKSetParser(JWKSSnapshot* jwks) : jwks_(jwks) {}
|
||||
|
||||
// Perform the parsing and populate JWKS's internal map. Return error status if
|
||||
// encountering any error.
|
||||
@@ -71,7 +80,7 @@ class JWKSetParser {
|
||||
}
|
||||
|
||||
private:
|
||||
JsonWebKeySet* jwks_;
|
||||
JWKSSnapshot* jwks_;
|
||||
|
||||
string NameOfTypeOfJsonValue(const Value& value) {
|
||||
switch (value.GetType()) {
|
||||
@@ -492,10 +501,11 @@ cleanup:
|
||||
}
|
||||
|
||||
//
|
||||
// JsonWebKeySet member functions.
|
||||
// JWKSSnapshot member functions.
|
||||
//
|
||||
|
||||
Status JsonWebKeySet::Init(const string& jwks_file_path) {
|
||||
// Load JWKS from the given local json file.
|
||||
Status JWKSSnapshot::LoadKeysFromFile(const string& jwks_file_path) {
|
||||
hs_key_map_.clear();
|
||||
rsa_pub_key_map_.clear();
|
||||
|
||||
@@ -525,11 +535,9 @@ Status JsonWebKeySet::Init(const string& jwks_file_path) {
|
||||
if (jwks_doc.HasParseError()) {
|
||||
return Status(
|
||||
TErrorCode::JWKS_PARSE_ERROR, GetParseError_En(jwks_doc.GetParseError()));
|
||||
}
|
||||
if (!jwks_doc.IsObject()) {
|
||||
} else if (!jwks_doc.IsObject()) {
|
||||
return Status(TErrorCode::JWKS_PARSE_ERROR, "root element must be a JSON Object");
|
||||
}
|
||||
if (!jwks_doc.HasMember("keys")) {
|
||||
} else if (!jwks_doc.HasMember("keys")) {
|
||||
return Status(TErrorCode::JWKS_PARSE_ERROR, "keys is required");
|
||||
}
|
||||
|
||||
@@ -537,7 +545,49 @@ Status JsonWebKeySet::Init(const string& jwks_file_path) {
|
||||
return jwks_parser.Parse(jwks_doc);
|
||||
}
|
||||
|
||||
void JsonWebKeySet::AddHSKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
|
||||
// Download JWKS from the given URL with Kudu's EasyCurl wrapper.
|
||||
Status JWKSSnapshot::LoadKeysFromUrl(
|
||||
const std::string& jwks_url, uint64_t cur_jwks_checksum, bool* is_changed) {
|
||||
kudu::EasyCurl curl;
|
||||
kudu::faststring dst;
|
||||
Status status;
|
||||
|
||||
curl.set_timeout(
|
||||
kudu::MonoDelta::FromMilliseconds(FLAGS_jwks_pulling_timeout_s * 1000));
|
||||
curl.set_verify_peer(false);
|
||||
// TODO support CurlAuthType by calling kudu::EasyCurl::set_auth().
|
||||
KUDU_RETURN_IF_ERROR(curl.FetchURL(jwks_url, &dst),
|
||||
Substitute("Error downloading JWKS from '$0'", jwks_url));
|
||||
if (dst.size() > 0) {
|
||||
// Verify if the checksum of the downloaded JWKS has been changed.
|
||||
jwks_checksum_ = HashUtil::FastHash64(dst.data(), dst.size(), /*seed*/ 0xcafebeef);
|
||||
if (jwks_checksum_ == cur_jwks_checksum) return Status::OK();
|
||||
*is_changed = true;
|
||||
// Append '\0' so that the in-memory object could be parsed as StringStream.
|
||||
dst.push_back('\0');
|
||||
#ifndef NDEBUG
|
||||
VLOG(3) << "JWKS: " << dst.data();
|
||||
#endif
|
||||
// Parse in-memory JWKS JSON object as StringStream.
|
||||
Document jwks_doc;
|
||||
jwks_doc.Parse((char*)dst.data());
|
||||
if (jwks_doc.HasParseError()) {
|
||||
status = Status(
|
||||
TErrorCode::JWKS_PARSE_ERROR, GetParseError_En(jwks_doc.GetParseError()));
|
||||
} else if (!jwks_doc.IsObject()) {
|
||||
status = Status(TErrorCode::JWKS_PARSE_ERROR, "root element must be a JSON Object");
|
||||
} else if (!jwks_doc.HasMember("keys")) {
|
||||
status = Status(TErrorCode::JWKS_PARSE_ERROR, "keys is required");
|
||||
} else {
|
||||
// Load and initialize public keys.
|
||||
JWKSetParser jwks_parser(this);
|
||||
status = jwks_parser.Parse(jwks_doc);
|
||||
}
|
||||
}
|
||||
return status;
|
||||
}
|
||||
|
||||
void JWKSSnapshot::AddHSKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
|
||||
if (hs_key_map_.find(key_id) == hs_key_map_.end()) {
|
||||
hs_key_map_[key_id].reset(jwk_pub_key);
|
||||
} else {
|
||||
@@ -545,7 +595,7 @@ void JsonWebKeySet::AddHSKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
|
||||
}
|
||||
}
|
||||
|
||||
void JsonWebKeySet::AddRSAPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
|
||||
void JWKSSnapshot::AddRSAPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
|
||||
if (rsa_pub_key_map_.find(key_id) == rsa_pub_key_map_.end()) {
|
||||
rsa_pub_key_map_[key_id].reset(jwk_pub_key);
|
||||
} else {
|
||||
@@ -553,7 +603,7 @@ void JsonWebKeySet::AddRSAPublicKey(std::string key_id, JWTPublicKey* jwk_pub_ke
|
||||
}
|
||||
}
|
||||
|
||||
void JsonWebKeySet::AddECPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
|
||||
void JWKSSnapshot::AddECPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key) {
|
||||
if (ec_pub_key_map_.find(key_id) == ec_pub_key_map_.end()) {
|
||||
ec_pub_key_map_[key_id].reset(jwk_pub_key);
|
||||
} else {
|
||||
@@ -561,7 +611,7 @@ void JsonWebKeySet::AddECPublicKey(std::string key_id, JWTPublicKey* jwk_pub_key
|
||||
}
|
||||
}
|
||||
|
||||
const JWTPublicKey* JsonWebKeySet::LookupHSKey(const std::string& kid) const {
|
||||
const JWTPublicKey* JWKSSnapshot::LookupHSKey(const std::string& kid) const {
|
||||
auto find_it = hs_key_map_.find(kid);
|
||||
if (find_it == hs_key_map_.end()) {
|
||||
// Could not find key for the given key ID.
|
||||
@@ -570,7 +620,7 @@ const JWTPublicKey* JsonWebKeySet::LookupHSKey(const std::string& kid) const {
|
||||
return find_it->second.get();
|
||||
}
|
||||
|
||||
const JWTPublicKey* JsonWebKeySet::LookupRSAPublicKey(const std::string& kid) const {
|
||||
const JWTPublicKey* JWKSSnapshot::LookupRSAPublicKey(const std::string& kid) const {
|
||||
auto find_it = rsa_pub_key_map_.find(kid);
|
||||
if (find_it == rsa_pub_key_map_.end()) {
|
||||
// Could not find key for the given key ID.
|
||||
@@ -579,7 +629,7 @@ const JWTPublicKey* JsonWebKeySet::LookupRSAPublicKey(const std::string& kid) co
|
||||
return find_it->second.get();
|
||||
}
|
||||
|
||||
const JWTPublicKey* JsonWebKeySet::LookupECPublicKey(const std::string& kid) const {
|
||||
const JWTPublicKey* JWKSSnapshot::LookupECPublicKey(const std::string& kid) const {
|
||||
auto find_it = ec_pub_key_map_.find(kid);
|
||||
if (find_it == ec_pub_key_map_.end()) {
|
||||
// Could not find key for the given key ID.
|
||||
@@ -588,6 +638,95 @@ const JWTPublicKey* JsonWebKeySet::LookupECPublicKey(const std::string& kid) con
|
||||
return find_it->second.get();
|
||||
}
|
||||
|
||||
//
|
||||
// JWKSMgr member functions.
|
||||
//
|
||||
|
||||
JWKSMgr::~JWKSMgr() {
|
||||
shut_down_promise_.Set(true);
|
||||
if (jwks_update_thread_ != nullptr) jwks_update_thread_->Join();
|
||||
}
|
||||
|
||||
Status JWKSMgr::Init(const std::string& jwks_uri, bool is_local_file) {
|
||||
Status status;
|
||||
jwks_uri_ = jwks_uri;
|
||||
std::shared_ptr<JWKSSnapshot> new_jwks = std::make_shared<JWKSSnapshot>();
|
||||
if (is_local_file) {
|
||||
status = new_jwks->LoadKeysFromFile(jwks_uri);
|
||||
if (!status.ok()) {
|
||||
LOG(ERROR) << "Failed to load JWKS: " << status;
|
||||
return status;
|
||||
}
|
||||
SetJWKSSnapshot(new_jwks);
|
||||
} else {
|
||||
if (FLAGS_jwks_update_frequency_s <= 0) {
|
||||
LOG(WARNING) << "Invalid value for flag jwks_update_frequency_s: "
|
||||
<< FLAGS_jwks_update_frequency_s << ", use default value 60.";
|
||||
FLAGS_jwks_update_frequency_s = 60;
|
||||
}
|
||||
if (FLAGS_jwks_pulling_timeout_s <= 0) {
|
||||
LOG(WARNING) << "Invalid value for flag jwks_pulling_timeout_s: "
|
||||
<< FLAGS_jwks_pulling_timeout_s << ", use default value 10.";
|
||||
FLAGS_jwks_pulling_timeout_s = 10;
|
||||
}
|
||||
|
||||
bool is_changed = false;
|
||||
status = new_jwks->LoadKeysFromUrl(jwks_uri, current_jwks_checksum_, &is_changed);
|
||||
if (!status.ok()) {
|
||||
LOG(ERROR) << "Failed to load JWKS: " << status;
|
||||
return status;
|
||||
}
|
||||
DCHECK(is_changed);
|
||||
if (is_changed) SetJWKSSnapshot(new_jwks);
|
||||
|
||||
// Start a working thread to periodically check the JWKS URL for updates.
|
||||
RETURN_IF_ERROR(Thread::Create("impala-server", "JWKS-mgr",
|
||||
&JWKSMgr::UpdateJWKSThread, this, &jwks_update_thread_));
|
||||
}
|
||||
|
||||
if (new_jwks->IsEmpty()) LOG(WARNING) << "JWKS file is empty.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void JWKSMgr::UpdateJWKSThread() {
|
||||
std::shared_ptr<JWKSSnapshot> new_jwks;
|
||||
int64_t timeout_millis = FLAGS_jwks_update_frequency_s * 1000;
|
||||
while (true) {
|
||||
// This Get() will time out until shutdown, when the promise is set.
|
||||
bool timed_out;
|
||||
shut_down_promise_.Get(timeout_millis, &timed_out);
|
||||
if (!timed_out) break;
|
||||
|
||||
new_jwks = std::make_shared<JWKSSnapshot>();
|
||||
bool is_changed = false;
|
||||
Status status =
|
||||
new_jwks->LoadKeysFromUrl(jwks_uri_, current_jwks_checksum_, &is_changed);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "Failed to update JWKS: " << status;
|
||||
} else if (is_changed) {
|
||||
SetJWKSSnapshot(new_jwks);
|
||||
}
|
||||
new_jwks.reset();
|
||||
}
|
||||
// The promise must be set to true.
|
||||
DCHECK(shut_down_promise_.IsSet());
|
||||
DCHECK(shut_down_promise_.Get());
|
||||
}
|
||||
|
||||
JWKSSnapshotPtr JWKSMgr::GetJWKSSnapshot() const {
|
||||
std::lock_guard<std::mutex> l(current_jwks_lock_);
|
||||
DCHECK(current_jwks_.get() != nullptr);
|
||||
JWKSSnapshotPtr jwks = current_jwks_;
|
||||
return jwks;
|
||||
}
|
||||
|
||||
void JWKSMgr::SetJWKSSnapshot(const JWKSSnapshotPtr& new_jwks) {
|
||||
std::lock_guard<std::mutex> l(current_jwks_lock_);
|
||||
DCHECK(new_jwks.get() != nullptr);
|
||||
current_jwks_ = new_jwks;
|
||||
current_jwks_checksum_ = new_jwks->GetChecksum();
|
||||
}
|
||||
|
||||
//
|
||||
// JWTHelper member functions.
|
||||
//
|
||||
@@ -603,14 +742,18 @@ void JWTHelper::TokenDeleter::operator()(JWTHelper::JWTDecodedToken* token) cons
|
||||
if (token != nullptr) delete token;
|
||||
};
|
||||
|
||||
Status JWTHelper::Init(const std::string& jwks_file_path) {
|
||||
jwks_.reset(new JsonWebKeySet());
|
||||
RETURN_IF_ERROR(jwks_->Init(jwks_file_path));
|
||||
if (jwks_->IsEmpty()) LOG(WARNING) << "JWKS file is empty.";
|
||||
initialized_ = true;
|
||||
Status JWTHelper::Init(const std::string& jwks_uri, bool is_local_file) {
|
||||
jwks_mgr_.reset(new JWKSMgr());
|
||||
RETURN_IF_ERROR(jwks_mgr_->Init(jwks_uri, is_local_file));
|
||||
if (!initialized_) initialized_ = true;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
JWKSSnapshotPtr JWTHelper::GetJWKS() const {
|
||||
DCHECK(initialized_);
|
||||
return jwks_mgr_->GetJWKSSnapshot();
|
||||
}
|
||||
|
||||
// Decode the given JWT token.
|
||||
Status JWTHelper::Decode(const string& token, UniqueJWTDecodedToken& decoded_token_out) {
|
||||
Status status;
|
||||
@@ -642,18 +785,24 @@ Status JWTHelper::Decode(const string& token, UniqueJWTDecodedToken& decoded_tok
|
||||
|
||||
// Validate the token's signature with public key.
|
||||
Status JWTHelper::Verify(const JWTDecodedToken* decoded_token) const {
|
||||
Status status;
|
||||
DCHECK(initialized_);
|
||||
DCHECK(decoded_token != nullptr);
|
||||
|
||||
if (decoded_token->decoded_jwt_.get_signature().empty()) {
|
||||
// Don't accept JWT without a signature.
|
||||
return Status(TErrorCode::JWT_VERIFY_FAILED, "Unsecured JWT");
|
||||
} else if (jwks_ == nullptr) {
|
||||
// Skip to signature validation if there is no public key.
|
||||
} else if (jwks_mgr_ == nullptr) {
|
||||
// Skip to signature validation if JWKS file or url is not specified.
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status status;
|
||||
JWKSSnapshotPtr jwks = GetJWKS();
|
||||
if (jwks->IsEmpty()) {
|
||||
return Status(
|
||||
TErrorCode::JWT_VERIFY_FAILED, "Verification failed, no matching valid key");
|
||||
};
|
||||
|
||||
try {
|
||||
string algorithm =
|
||||
boost::algorithm::to_lower_copy(decoded_token->decoded_jwt_.get_algorithm());
|
||||
@@ -664,11 +813,11 @@ Status JWTHelper::Verify(const JWTDecodedToken* decoded_token) const {
|
||||
|
||||
const JWTPublicKey* pub_key = nullptr;
|
||||
if (prefix == "hs") {
|
||||
pub_key = jwks_->LookupHSKey(key_id);
|
||||
pub_key = jwks->LookupHSKey(key_id);
|
||||
} else if (prefix == "rs" || prefix == "ps") {
|
||||
pub_key = jwks_->LookupRSAPublicKey(key_id);
|
||||
pub_key = jwks->LookupRSAPublicKey(key_id);
|
||||
} else if (prefix == "es") {
|
||||
pub_key = jwks_->LookupECPublicKey(key_id);
|
||||
pub_key = jwks->LookupECPublicKey(key_id);
|
||||
} else {
|
||||
return Status(TErrorCode::JWT_VERIFY_FAILED,
|
||||
Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
|
||||
@@ -682,13 +831,13 @@ Status JWTHelper::Verify(const JWTDecodedToken* decoded_token) const {
|
||||
// According to RFC 7517 (JSON Web Key), 'kid' is OPTIONAL so it's possible there
|
||||
// is no key id in the token's header. In this case, get all of public keys from
|
||||
// JWKS for the family of algorithms.
|
||||
const JsonWebKeySet::JWTPublicKeyMap* key_map = nullptr;
|
||||
const JWKSSnapshot::JWTPublicKeyMap* key_map = nullptr;
|
||||
if (prefix == "hs") {
|
||||
key_map = jwks_->GetAllHSKeys();
|
||||
key_map = jwks->GetAllHSKeys();
|
||||
} else if (prefix == "rs" || prefix == "ps") {
|
||||
key_map = jwks_->GetAllRSAPublicKeys();
|
||||
key_map = jwks->GetAllRSAPublicKeys();
|
||||
} else if (prefix == "es") {
|
||||
key_map = jwks_->GetAllECPublicKeys();
|
||||
key_map = jwks->GetAllECPublicKeys();
|
||||
} else {
|
||||
return Status(TErrorCode::JWT_VERIFY_FAILED,
|
||||
Substitute("Unsupported cryptographic algorithm '$0' for JWT", algorithm));
|
||||
|
||||
@@ -25,7 +25,8 @@
|
||||
|
||||
namespace impala {
|
||||
|
||||
class JsonWebKeySet;
|
||||
class JWKSSnapshot;
|
||||
class JWKSMgr;
|
||||
|
||||
/// JSON Web Token (JWT) is an Internet proposed standard for creating data with optional
|
||||
/// signature and/or optional encryption whose payload holds JSON that asserts some
|
||||
@@ -33,7 +34,7 @@ class JsonWebKeySet;
|
||||
/// private key.
|
||||
/// This class works as wrapper for jwt-cpp. It provides APIs to decode/verify JWT token,
|
||||
/// and extracts custom claim from the payload of JWT token.
|
||||
/// JsonWebKeySet is read only after Init() is called. The class is thread safe.
|
||||
/// The class is thread safe.
|
||||
class JWTHelper {
|
||||
public:
|
||||
/// Opaque types for storing the JWT decoded token. This allows us to avoid including
|
||||
@@ -53,16 +54,17 @@ class JWTHelper {
|
||||
/// Return the single instance.
|
||||
static JWTHelper* GetInstance() { return jwt_helper_; }
|
||||
|
||||
/// Load JWKS from a JSON file. Returns an error if problems were encountered.
|
||||
Status Init(const std::string& jwks_file_path);
|
||||
/// Load JWKS from a given local JSON file or URL. Returns an error if problems were
|
||||
/// encountered.
|
||||
Status Init(const std::string& jwks_uri, bool is_local_file);
|
||||
|
||||
/// Decode the given JWT token. The decoding result is stored in decoded_token_.
|
||||
/// Return Status::OK if the decoding is successful.
|
||||
static Status Decode(
|
||||
const std::string& token, UniqueJWTDecodedToken& decoded_token_out);
|
||||
|
||||
/// Verify the token's signature with the given JWKS. The token should be already
|
||||
/// decoded by calling Decode().
|
||||
/// Verify the token's signature with the JWKS. The token should be already decoded by
|
||||
/// calling Decode().
|
||||
/// Return Status::OK if the verification is successful.
|
||||
Status Verify(const JWTDecodedToken* decoded_token) const;
|
||||
|
||||
@@ -71,8 +73,8 @@ class JWTHelper {
|
||||
static Status GetCustomClaimUsername(const JWTDecodedToken* decoded_token,
|
||||
const std::string& custom_claim_username, std::string& username);
|
||||
|
||||
/// Return Json Web Key Set. It's called only by unit-test code.
|
||||
const JsonWebKeySet* GetJWKS() const { return jwks_.get(); }
|
||||
/// Return snapshot of JWKS.
|
||||
std::shared_ptr<const JWKSSnapshot> GetJWKS() const;
|
||||
|
||||
private:
|
||||
/// Single instance.
|
||||
@@ -81,9 +83,9 @@ class JWTHelper {
|
||||
/// Set it as TRUE when Init() is called.
|
||||
bool initialized_ = false;
|
||||
|
||||
/// Json Web Key Set (JWKS) for Json Web Token (JWT) verification.
|
||||
/// JWKS Manager for Json Web Token (JWT) verification.
|
||||
/// Only one instance per daemon.
|
||||
std::unique_ptr<JsonWebKeySet> jwks_;
|
||||
std::unique_ptr<JWKSMgr> jwks_mgr_;
|
||||
};
|
||||
|
||||
} // namespace impala
|
||||
|
||||
@@ -464,8 +464,8 @@ def get_toolchain_downloads():
|
||||
toolchain_packages += [llvm_package, llvm_package_asserts, gcc_package]
|
||||
toolchain_packages += map(ToolchainPackage,
|
||||
["avro", "binutils", "boost", "breakpad", "bzip2", "cctz", "cmake", "crcutil",
|
||||
"flatbuffers", "gdb", "gflags", "glog", "gperftools", "gtest", "jwt-cpp", "libev",
|
||||
"libunwind", "lz4", "openldap", "openssl", "orc", "protobuf", "python",
|
||||
"curl", "flatbuffers", "gdb", "gflags", "glog", "gperftools", "gtest", "jwt-cpp",
|
||||
"libev", "libunwind", "lz4", "openldap", "openssl", "orc", "protobuf", "python",
|
||||
"rapidjson", "re2", "snappy", "thrift", "tpc-h", "tpc-ds", "zlib", "zstd"])
|
||||
# Check whether this platform is supported (or whether a valid custom toolchain
|
||||
# has been provided).
|
||||
|
||||
@@ -87,6 +87,8 @@ export IMPALA_CMAKE_VERSION=3.14.3
|
||||
unset IMPALA_CMAKE_URL
|
||||
export IMPALA_CRCUTIL_VERSION=440ba7babeff77ffad992df3a10c767f184e946e-p2
|
||||
unset IMPALA_CRCUTIL_URL
|
||||
export IMPALA_CURL_VERSION=7.78.0
|
||||
unset IMPALA_CURL_URL
|
||||
export IMPALA_CYRUS_SASL_VERSION=2.1.23
|
||||
unset IMPALA_CYRUS_SASL_URL
|
||||
export IMPALA_FLATBUFFERS_VERSION=1.6.0
|
||||
|
||||
@@ -167,7 +167,7 @@ testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2
|
||||
testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2_default.expected.txt
|
||||
testdata/impala-profiles/impala_profile_log_tpcds_compute_stats_v2_extended.expected.txt
|
||||
testdata/hive_benchmark/grepTiny/part-00000
|
||||
testdata/jwt/jwks_rs256.json
|
||||
testdata/jwt/*.json
|
||||
testdata/tzdb/2017c.zip
|
||||
testdata/tzdb/2017c-corrupt.zip
|
||||
testdata/tzdb_tiny/*
|
||||
|
||||
54
cmake_modules/FindCurl.cmake
Normal file
54
cmake_modules/FindCurl.cmake
Normal file
@@ -0,0 +1,54 @@
|
||||
##############################################################################
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
##############################################################################
|
||||
|
||||
# - Find CURL (curl.h, libcurl.a, libcurl.so, and libglog.so.4) with CURL_ROOT
|
||||
# hinting a location
|
||||
#
|
||||
# This module defines
|
||||
# CURL_INCLUDE_DIR, directory containing curl headers
|
||||
# CURL_LIBS, directory containing curl libraries
|
||||
# CURL_STATIC_LIB, path to libcurl.a
|
||||
|
||||
set(CURL_SEARCH_HEADER_PATHS ${CURL_ROOT}/include)
|
||||
set(CURL_SEARCH_LIB_PATH ${CURL_ROOT}/lib)
|
||||
|
||||
find_path(CURL_INCLUDE_DIR NAMES curl/curl.h PATHS
|
||||
${CURL_SEARCH_HEADER_PATHS}
|
||||
# make sure we don't accidentally pick up a different version
|
||||
NO_DEFAULT_PATH
|
||||
)
|
||||
|
||||
find_library(CURL_STATIC_LIB NAMES libcurl.a PATHS ${CURL_SEARCH_LIB_PATH})
|
||||
find_library(CURL_SHARED_LIB NAMES libcurl.so PATHS ${CURL_SEARCH_LIB_PATH})
|
||||
|
||||
if (NOT CURL_INCLUDE_DIR OR NOT CURL_STATIC_LIB)
|
||||
message(FATAL_ERROR "Curl includes and libraries NOT found. "
|
||||
"Looked for headers in ${CURL_SEARCH_HEADER_PATH}, "
|
||||
"and for libs in ${CURL_SEARCH_LIB_PATH}")
|
||||
set(CURL_FOUND FALSE)
|
||||
else()
|
||||
set(CURL_FOUND TRUE)
|
||||
endif ()
|
||||
|
||||
mark_as_advanced(
|
||||
CURL_INCLUDE_DIR
|
||||
CURL_STATIC_LIB
|
||||
CURL_SHARED_LIB
|
||||
CURL_FOUND
|
||||
)
|
||||
@@ -54,9 +54,25 @@ class CustomClusterRunner {
|
||||
*/
|
||||
public static int StartImpalaCluster(String args, Map<String, String> env,
|
||||
String startArgs) throws IOException, InterruptedException {
|
||||
ProcessBuilder pb =
|
||||
new ProcessBuilder(new String[] {"start-impala-cluster.py", "--impalad_args",
|
||||
args, "--catalogd_args", args, "--state_store_args", args, startArgs});
|
||||
return StartImpalaCluster(args, args, args, env, startArgs);
|
||||
}
|
||||
|
||||
public static int StartImpalaCluster(String impaladArgs, String catalogdArgs,
|
||||
String statestoredArgs) throws IOException, InterruptedException {
|
||||
return StartImpalaCluster(
|
||||
impaladArgs, catalogdArgs, statestoredArgs, new HashMap<String, String>(), "");
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts Impala, setting environment variables in 'env', and passing 'impalad_args',
|
||||
* 'catalogd_args', 'statestored_args', and 'startArgs' to start-impala-cluster.py.
|
||||
*/
|
||||
public static int StartImpalaCluster(String impaladArgs, String catalogdArgs,
|
||||
String statestoredArgs, Map<String, String> env, String startArgs)
|
||||
throws IOException, InterruptedException {
|
||||
ProcessBuilder pb = new ProcessBuilder(new String[] {"start-impala-cluster.py",
|
||||
"--impalad_args", impaladArgs, "--catalogd_args", catalogdArgs,
|
||||
"--state_store_args", statestoredArgs, startArgs});
|
||||
pb.redirectErrorStream(true);
|
||||
Map<String, String> origEnv = pb.environment();
|
||||
origEnv.putAll(env);
|
||||
|
||||
@@ -21,7 +21,15 @@ import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -30,6 +38,7 @@ import org.apache.hive.service.rpc.thrift.*;
|
||||
import org.apache.impala.util.Metrics;
|
||||
import org.apache.thrift.transport.THttpClient;
|
||||
import org.apache.thrift.protocol.TBinaryProtocol;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@@ -38,13 +47,71 @@ import org.junit.Test;
|
||||
* JWT authentication is being used.
|
||||
*/
|
||||
public class JwtHttpTest {
|
||||
Metrics metrics = new Metrics();
|
||||
Metrics metrics_ = new Metrics();
|
||||
|
||||
/* Since we don't have Java version of JWT library, we use pre-calculated JWT token.
|
||||
* The token and JWK set used in this test case were generated by using BE unit-test
|
||||
* function JwtUtilTest::VerifyJwtRS256.
|
||||
*/
|
||||
String jwtToken_ =
|
||||
"eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
|
||||
+ "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
|
||||
+ "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
|
||||
+ "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
|
||||
+ "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
|
||||
+ "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
|
||||
+ "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
|
||||
|
||||
/*
|
||||
* Create JWKS file in the root directory of WebServer if it's set as true.
|
||||
*/
|
||||
boolean createJWKSForWebServer_ = false;
|
||||
|
||||
public void setUp(String extraArgs) throws Exception {
|
||||
int ret = CustomClusterRunner.StartImpalaCluster(extraArgs);
|
||||
assertEquals(ret, 0);
|
||||
}
|
||||
|
||||
public void setUp(String impaladArgs, String catalogdArgs, String statestoredArgs)
|
||||
throws Exception {
|
||||
if (createJWKSForWebServer_) createTempJWKSInWebServerRootDir("jwks_rs256.json");
|
||||
|
||||
int ret = CustomClusterRunner.StartImpalaCluster(
|
||||
impaladArgs, catalogdArgs, statestoredArgs);
|
||||
assertEquals(ret, 0);
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() throws Exception {
|
||||
// Leave a cluster running with the default configuration, then delete temporary
|
||||
// JWKS file.
|
||||
CustomClusterRunner.StartImpalaCluster();
|
||||
if (createJWKSForWebServer_) deleteTempJWKSFromWebServerRootDir();
|
||||
metrics_.Close();
|
||||
}
|
||||
|
||||
private void createTempJWKSInWebServerRootDir(String srcFilename) {
|
||||
Path srcJwksPath =
|
||||
(Path) Paths.get(System.getenv("IMPALA_HOME"), "testdata", "jwt", srcFilename);
|
||||
Path tempJwksPath =
|
||||
(Path) Paths.get(System.getenv("IMPALA_HOME"), "www", "temp_jwks.json");
|
||||
try {
|
||||
Files.copy(srcJwksPath, tempJwksPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
} catch (IOException e) {
|
||||
fail("Failed to copy file: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteTempJWKSFromWebServerRootDir() {
|
||||
Path tempJwksPath =
|
||||
(Path) Paths.get(System.getenv("IMPALA_HOME"), "www", "temp_jwks.json");
|
||||
try {
|
||||
Files.delete(tempJwksPath);
|
||||
} catch (IOException e) {
|
||||
fail("Failed to delete file: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
static void verifySuccess(TStatus status) throws Exception {
|
||||
if (status.getStatusCode() == TStatusCode.SUCCESS_STATUS
|
||||
|| status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS) {
|
||||
@@ -78,24 +145,23 @@ public class JwtHttpTest {
|
||||
private void verifyJwtAuthMetrics(long expectedAuthSuccess, long expectedAuthFailure)
|
||||
throws Exception {
|
||||
long actualAuthSuccess =
|
||||
(long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
|
||||
(long) metrics_.getMetric("impala.thrift-server.hiveserver2-http-frontend."
|
||||
+ "total-jwt-token-auth-success");
|
||||
assertEquals(expectedAuthSuccess, actualAuthSuccess);
|
||||
long actualAuthFailure =
|
||||
(long) metrics.getMetric("impala.thrift-server.hiveserver2-http-frontend."
|
||||
(long) metrics_.getMetric("impala.thrift-server.hiveserver2-http-frontend."
|
||||
+ "total-jwt-token-auth-failure");
|
||||
assertEquals(expectedAuthFailure, actualAuthFailure);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if sessions are authenticated by verifying the JWT token for connections
|
||||
* to the HTTP hiveserver2 endpoint.
|
||||
* Since we don't have Java version of JWT library, we use pre-calculated JWT token
|
||||
* and JWKS. The token and JWK set used in this test case were generated by using
|
||||
* BE unit-test function JwtUtilTest::VerifyJwtRS256.
|
||||
* to the HTTP hiveserver2 endpoint. The JWKS for JWT verification is specified as
|
||||
* local json file.
|
||||
*/
|
||||
@Test
|
||||
public void testJwtAuth() throws Exception {
|
||||
createJWKSForWebServer_ = false;
|
||||
String jwksFilename =
|
||||
new File(System.getenv("IMPALA_HOME"), "testdata/jwt/jwks_rs256.json").getPath();
|
||||
setUp(String.format(
|
||||
@@ -106,15 +172,7 @@ public class JwtHttpTest {
|
||||
Map<String, String> headers = new HashMap<String, String>();
|
||||
|
||||
// Case 1: Authenticate with valid JWT Token in HTTP header.
|
||||
String jwtToken =
|
||||
"eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
|
||||
+ "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
|
||||
+ "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
|
||||
+ "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
|
||||
+ "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
|
||||
+ "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
|
||||
+ "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
|
||||
headers.put("Authorization", "Bearer " + jwtToken);
|
||||
headers.put("Authorization", "Bearer " + jwtToken_);
|
||||
headers.put("X-Forwarded-For", "127.0.0.1");
|
||||
transport.setCustomHeaders(headers);
|
||||
transport.open();
|
||||
@@ -142,7 +200,7 @@ public class JwtHttpTest {
|
||||
transport.setCustomHeaders(headers);
|
||||
try {
|
||||
openResp = client.OpenSession(openReq);
|
||||
fail("Exception exception.");
|
||||
fail("Exception expected.");
|
||||
} catch (Exception e) {
|
||||
verifyJwtAuthMetrics(3, 1);
|
||||
assertEquals(e.getMessage(), "HTTP Response code: 401");
|
||||
@@ -154,7 +212,7 @@ public class JwtHttpTest {
|
||||
transport.setCustomHeaders(headers);
|
||||
try {
|
||||
openResp = client.OpenSession(openReq);
|
||||
fail("Exception exception.");
|
||||
fail("Exception expected.");
|
||||
} catch (Exception e) {
|
||||
// JWT authentication is not invoked.
|
||||
verifyJwtAuthMetrics(3, 1);
|
||||
@@ -166,7 +224,7 @@ public class JwtHttpTest {
|
||||
transport.setCustomHeaders(headers);
|
||||
try {
|
||||
openResp = client.OpenSession(openReq);
|
||||
fail("Exception exception.");
|
||||
fail("Exception expected.");
|
||||
} catch (Exception e) {
|
||||
// JWT authentication is not invoked.
|
||||
verifyJwtAuthMetrics(3, 1);
|
||||
@@ -176,10 +234,12 @@ public class JwtHttpTest {
|
||||
|
||||
/**
|
||||
* Tests if sessions are authenticated by verifying the JWT token for connections
|
||||
* to the HTTP hiveserver2 endpoint.
|
||||
* to the HTTP hiveserver2 endpoint. The JWKS for JWT verification is not specified
|
||||
* and JWT signatures are not verified.
|
||||
*/
|
||||
@Test
|
||||
public void testJwtAuthNotVerifySig() throws Exception {
|
||||
createJWKSForWebServer_ = false;
|
||||
// Start Impala without jwt_validate_signature as false so that the signature of
|
||||
// JWT token will not be validated.
|
||||
setUp("--jwt_token_auth=true --jwt_validate_signature=false "
|
||||
@@ -188,16 +248,7 @@ public class JwtHttpTest {
|
||||
Map<String, String> headers = new HashMap<String, String>();
|
||||
|
||||
// Case 1: Authenticate with valid JWT Token in HTTP header.
|
||||
// The Token was generated by BE unit-test function JwtUtilTest::VerifyJwtRS256.
|
||||
String jwtToken =
|
||||
"eyJhbGciOiJSUzI1NiIsImtpZCI6InB1YmxpYzpjNDI0YjY3Yi1mZTI4LTQ1ZDctYjAxNS1m"
|
||||
+ "NzlkYTUwYjViMjEiLCJ0eXAiOiJKV1MifQ.eyJpc3MiOiJhdXRoMCIsInVzZXJuYW1lIjoia"
|
||||
+ "W1wYWxhIn0.OW5H2SClLlsotsCarTHYEbqlbRh43LFwOyo9WubpNTwE7hTuJDsnFoVrvHiWI"
|
||||
+ "02W69TZNat7DYcC86A_ogLMfNXagHjlMFJaRnvG5Ekag8NRuZNJmHVqfX-qr6x7_8mpOdU55"
|
||||
+ "4kc200pqbpYLhhuK4Qf7oT7y9mOrtNrUKGDCZ0Q2y_mizlbY6SMg4RWqSz0RQwJbRgXIWSgc"
|
||||
+ "bZd0GbD_MQQ8x7WRE4nluU-5Fl4N2Wo8T9fNTuxALPiuVeIczO25b5n4fryfKasSgaZfmk0C"
|
||||
+ "oOJzqbtmQxqiK9QNSJAiH2kaqMwLNgAdgn8fbd-lB1RAEGeyPH8Px8ipqcKsPk0bg";
|
||||
headers.put("Authorization", "Bearer " + jwtToken);
|
||||
headers.put("Authorization", "Bearer " + jwtToken_);
|
||||
headers.put("X-Forwarded-For", "127.0.0.1");
|
||||
transport.setCustomHeaders(headers);
|
||||
transport.open();
|
||||
@@ -214,4 +265,103 @@ public class JwtHttpTest {
|
||||
// Two more successful authentications - for the Exec() and the Fetch().
|
||||
verifyJwtAuthMetrics(3, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if sessions are authenticated by verifying the JWT token for connections
|
||||
* to the HTTP hiveserver2 endpoint. The JWKS for JWT verification is specified as
|
||||
* HTTP URL to the statestore Web server.
|
||||
*/
|
||||
@Test
|
||||
public void testJwtAuthWithJwksHttpUrl() throws Exception {
|
||||
createJWKSForWebServer_ = true;
|
||||
String statestoreWebserverArgs = "--webserver_port=25010";
|
||||
String jwksHttpUrl = "http://localhost:25010/www/temp_jwks.json";
|
||||
String impaladJwtArgs = String.format("--jwt_token_auth=true "
|
||||
+ "--jwt_validate_signature=true --jwks_url=%s "
|
||||
+ "--jwks_update_frequency_s=1 --jwt_allow_without_tls=true",
|
||||
jwksHttpUrl);
|
||||
setUp(impaladJwtArgs, "", statestoreWebserverArgs);
|
||||
|
||||
THttpClient transport = new THttpClient("http://localhost:28000");
|
||||
Map<String, String> headers = new HashMap<String, String>();
|
||||
|
||||
// Authenticate with valid JWT Token in HTTP header.
|
||||
headers.put("Authorization", "Bearer " + jwtToken_);
|
||||
headers.put("X-Forwarded-For", "127.0.0.1");
|
||||
transport.setCustomHeaders(headers);
|
||||
transport.open();
|
||||
TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
|
||||
|
||||
// Open a session which will get username 'impala' from JWT token and use it as
|
||||
// login user.
|
||||
TOpenSessionReq openReq = new TOpenSessionReq();
|
||||
TOpenSessionResp openResp = client.OpenSession(openReq);
|
||||
// One successful authentication.
|
||||
verifyJwtAuthMetrics(1, 0);
|
||||
// Running a query should succeed.
|
||||
TOperationHandle operationHandle = execAndFetch(
|
||||
client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
|
||||
// Two more successful authentications - for the Exec() and the Fetch().
|
||||
verifyJwtAuthMetrics(3, 0);
|
||||
|
||||
// Update JWKS in the root directory of Web server.
|
||||
createTempJWKSInWebServerRootDir("jwks_es256.json");
|
||||
// Sleep long enough for coordinator to update JWKS from Web server.
|
||||
Thread.sleep(3000);
|
||||
// Authenticate fails due JWT verification failure since the RS256 public key cannot
|
||||
// be found in the JWKS.
|
||||
transport.setCustomHeaders(headers);
|
||||
try {
|
||||
openResp = client.OpenSession(openReq);
|
||||
fail("Exception expected.");
|
||||
} catch (Exception e) {
|
||||
verifyJwtAuthMetrics(3, 1);
|
||||
assertEquals(e.getMessage(), "HTTP Response code: 401");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if sessions are authenticated by verifying the JWT token for connections
|
||||
* to the HTTP hiveserver2 endpoint. The JWKS for JWT verification is specified as
|
||||
* HTTPS URL to the statestore Web server. Impala does not verify the certificate of
|
||||
* Web server when downloading JWKS.
|
||||
*/
|
||||
@Test
|
||||
public void testJwtAuthWithJwksHttpsUrl() throws Exception {
|
||||
createJWKSForWebServer_ = true;
|
||||
String certDir = new File(System.getenv("IMPALA_HOME"), "be/src/testutil").getPath();
|
||||
String statestoreWebserverArgs =
|
||||
String.format("--webserver_certificate_file=%s/server-cert.pem "
|
||||
+ "--webserver_private_key_file=%s/server-key.pem "
|
||||
+ "--webserver_interface=localhost --webserver_port=25010 "
|
||||
+ "--hostname=localhost ",
|
||||
certDir, certDir);
|
||||
String jwksHttpUrl = "https://localhost:25010/www/temp_jwks.json";
|
||||
String impaladJwtArgs = String.format("--jwt_token_auth=true "
|
||||
+ "--jwt_validate_signature=true --jwks_url=%s --jwt_allow_without_tls=true ",
|
||||
jwksHttpUrl);
|
||||
setUp(impaladJwtArgs, "", statestoreWebserverArgs);
|
||||
|
||||
THttpClient transport = new THttpClient("http://localhost:28000");
|
||||
Map<String, String> headers = new HashMap<String, String>();
|
||||
|
||||
// Authenticate with valid JWT Token in HTTP header.
|
||||
headers.put("Authorization", "Bearer " + jwtToken_);
|
||||
headers.put("X-Forwarded-For", "127.0.0.1");
|
||||
transport.setCustomHeaders(headers);
|
||||
transport.open();
|
||||
TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
|
||||
|
||||
// Open a session which will get username 'impala' from JWT token and use it as
|
||||
// login user.
|
||||
TOpenSessionReq openReq = new TOpenSessionReq();
|
||||
TOpenSessionResp openResp = client.OpenSession(openReq);
|
||||
// One successful authentication.
|
||||
verifyJwtAuthMetrics(1, 0);
|
||||
// Running a query should succeed.
|
||||
TOperationHandle operationHandle = execAndFetch(
|
||||
client, openResp.getSessionHandle(), "select logged_in_user()", "impala");
|
||||
// Two more successful authentications - for the Exec() and the Fetch().
|
||||
verifyJwtAuthMetrics(3, 0);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,9 @@ public class JwtWebserverTest {
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUp() throws IOException {
|
||||
public void cleanUp() throws Exception {
|
||||
// Leave a cluster running with the default configuration.
|
||||
CustomClusterRunner.StartImpalaCluster();
|
||||
metrics_.Close();
|
||||
}
|
||||
|
||||
|
||||
6
testdata/jwt/jwks_es256.json
vendored
Normal file
6
testdata/jwt/jwks_es256.json
vendored
Normal file
@@ -0,0 +1,6 @@
|
||||
|
||||
{
|
||||
"keys": [
|
||||
{ "kty": "EC", "kid": "public:c424b67b-fe28-45d7-b015-f79da50b5b21", "crv": "P-256", "x": "Qgb5npLHd0Bk61bNnjK632uwmBfrF7I8hoPgaOZjyhg", "y": "fgazwzugi-g_2lv8jzm115u0qWaIJkcBkTnDgN8lJXo" }
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user