mirror of
https://github.com/getredash/redash.git
synced 2025-12-25 01:03:20 -05:00
Allow RSA key used for JWT to be specified as a file path (#6271)
- auth_jwt_auth_public_certs_url may file:// in addition to http/https - Log an error if payload does not contain an email address
This commit is contained in:
@@ -1,9 +1,14 @@
|
||||
import importlib
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import time
|
||||
|
||||
import jwcrypto.jwk
|
||||
import jwt
|
||||
import requests
|
||||
from flask import request
|
||||
from mock import patch
|
||||
from mock import Mock, patch
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from redash import models, settings
|
||||
@@ -11,6 +16,8 @@ from redash.authentication import (
|
||||
api_key_load_user_from_request,
|
||||
get_login_url,
|
||||
hmac_load_user_from_request,
|
||||
jwt_auth,
|
||||
org_settings,
|
||||
sign,
|
||||
)
|
||||
from redash.authentication.google_oauth import (
|
||||
@@ -405,3 +412,69 @@ class TestUserForgotPassword(BaseTestCase):
|
||||
self.assertEqual(response.status_code, 200)
|
||||
send_password_reset_email_mock.assert_not_called()
|
||||
send_user_disabled_email_mock.assert_called_with(user)
|
||||
|
||||
|
||||
class TestJWTAuthentication(BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestJWTAuthentication, self).setUp()
|
||||
self.auth_audience = "My Org"
|
||||
self.auth_issuer = "Admin"
|
||||
self.token_name = "jwt-token"
|
||||
self.rsa_private_key = "/tmp/jwtRS256.key"
|
||||
self.rsa_public_key = "/tmp/jwtRS256.pem"
|
||||
|
||||
if not os.path.exists(self.rsa_public_key):
|
||||
subprocess.check_output(["openssl", "genrsa", "-out", self.rsa_private_key, "4096"])
|
||||
subprocess.check_output(
|
||||
["openssl", "rsa", "-pubout", "-in", self.rsa_private_key, "-out", self.rsa_public_key]
|
||||
)
|
||||
|
||||
org_settings["auth_jwt_login_enabled"] = True
|
||||
org_settings["auth_jwt_auth_public_certs_url"] = "file://{}".format(self.rsa_public_key)
|
||||
org_settings["auth_jwt_auth_issuer"] = self.auth_issuer
|
||||
org_settings["auth_jwt_auth_audience"] = self.auth_audience
|
||||
org_settings["auth_jwt_auth_header_name"] = self.token_name
|
||||
|
||||
def tearDown(self):
|
||||
org_settings["auth_jwt_login_enabled"] = False
|
||||
org_settings["auth_jwt_auth_public_certs_url"] = ""
|
||||
org_settings["auth_jwt_auth_issuer"] = ""
|
||||
org_settings["auth_jwt_auth_audience"] = ""
|
||||
org_settings["auth_jwt_auth_header_name"] = ""
|
||||
|
||||
def test_jwt_no_token(self):
|
||||
response = self.get_request("/data_sources", org=self.factory.org)
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
def test_jwt_from_pem_file(self):
|
||||
user = self.factory.create_user()
|
||||
|
||||
issued_at_timestamp = time.time()
|
||||
expiration_timestamp = issued_at_timestamp + 60
|
||||
|
||||
data = {
|
||||
"aud": self.auth_audience,
|
||||
"email": user.email,
|
||||
"exp": expiration_timestamp,
|
||||
"iat": issued_at_timestamp,
|
||||
"iss": self.auth_issuer,
|
||||
}
|
||||
with open(self.rsa_private_key) as keyfile:
|
||||
sign_key = keyfile.read().strip()
|
||||
token_data = jwt.encode(data, sign_key, algorithm="RS256")
|
||||
|
||||
response = self.get_request("/data_sources", org=self.factory.org, headers={self.token_name: token_data})
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
@patch.object(requests, "get")
|
||||
def test_jwk_decode(self, mock_get):
|
||||
with open(self.rsa_public_key, "rb") as keyfile:
|
||||
public_key = jwcrypto.jwk.JWK.from_pem(keyfile.read())
|
||||
jwk_keys = {"keys": [json.loads(public_key.export())]}
|
||||
|
||||
mockresponse = Mock()
|
||||
mockresponse.json = lambda: jwk_keys
|
||||
mock_get.return_value = mockresponse
|
||||
|
||||
keys = jwt_auth.get_public_keys("http://localhost/key.jwt")
|
||||
self.assertEqual(keys[0].key_size, 4096)
|
||||
|
||||
Reference in New Issue
Block a user