IMPALA-10745: Support Kerberos over HTTP for impala-shell

This patch ports the implementation of GSSAPI authentication over http
transport from Impyla (https://github.com/cloudera/impyla/pull/415) to
impala-shell.

The implementation adds a new dependency on 'kerberos' python module,
which is a pip-installed module distributed under Apache License Version
2.
When using impala-shell with Kerberos over http, it is assumed that the
host has a preexisting kinit-cached Kerberos ticket that impala-shell
can pass to the server automatically without the user to reenter the
password.

Testing:
 - Passed exhaustive tests.
 - Tested manually on a real cluster with a full Kerberos setup.

Change-Id: Ia59ba4004490735162adbd468a00a962165c5abd
Reviewed-on: http://gerrit.cloudera.org:8080/18493
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
wzhou-code
2022-04-27 15:30:51 -07:00
committed by Impala Public Jenkins
parent e6ed98c22b
commit 397d1d15a2
6 changed files with 183 additions and 81 deletions

View File

@@ -40,6 +40,7 @@ impyla == 0.18a1
# six == 1.14.0 (specified separately) # six == 1.14.0 (specified separately)
thrift_sasl == 0.4.3 thrift_sasl == 0.4.3
kazoo == 2.2.1 kazoo == 2.2.1
kerberos == 1.3.1
pexpect == 3.3 pexpect == 3.3
pg8000 == 1.10.2 pg8000 == 1.10.2
prettytable == 0.7.2 prettytable == 0.7.2

View File

@@ -30,7 +30,7 @@ from collections import namedtuple
from six.moves import urllib, http_client from six.moves import urllib, http_client
from thrift.transport.TTransport import TTransportBase from thrift.transport.TTransport import TTransportBase
from shell_exceptions import HttpError from shell_exceptions import HttpError, AuthenticationException
from cookie_util import get_all_matching_cookies, get_cookie_expiry from cookie_util import get_all_matching_cookies, get_cookie_expiry
import six import six
@@ -112,18 +112,27 @@ class ImpalaHttpClient(TTransportBase):
self.realhost = self.realport = self.proxy_auth = None self.realhost = self.realport = self.proxy_auth = None
if (not http_cookie_names) or (str(http_cookie_names).strip() == ""): if (not http_cookie_names) or (str(http_cookie_names).strip() == ""):
self.__http_cookie_dict = None self.__http_cookie_dict = None
self.__auth_cookie_names = None
else: else:
# Build a dictionary that maps cookie name to namedtuple. # Build a dictionary that maps cookie name to namedtuple.
cookie_names = http_cookie_names.split(',') cookie_names = http_cookie_names.split(',')
self.__http_cookie_dict = \ self.__http_cookie_dict = \
{cn: Cookie(cookie=None, expiry_time=None) for cn in cookie_names} {cn: Cookie(cookie=None, expiry_time=None) for cn in cookie_names}
# Store the auth cookie names in __auth_cookie_names.
# Assume auth cookie names end with ".auth".
self.__auth_cookie_names = [cn for cn in cookie_names if cn.endswith(".auth")]
# Set __are_matching_cookies_found as True if matching cookies are found in response. # Set __are_matching_cookies_found as True if matching cookies are found in response.
self.__are_matching_cookies_found = False self.__are_matching_cookies_found = False
self.__wbuf = BytesIO() self.__wbuf = BytesIO()
self.__http = None self.__http = None
self.__http_response = None self.__http_response = None
self.__timeout = socket_timeout_s self.__timeout = socket_timeout_s
# __custom_headers is used to store HTTP headers which are generated in runtime for
# new request.
self.__custom_headers = None self.__custom_headers = None
self.__get_custom_headers_func = None
self.__basic_auth = None
self.__kerb_service = None
@staticmethod @staticmethod
def basic_proxy_auth_header(proxy): def basic_proxy_auth_header(proxy):
@@ -165,27 +174,75 @@ class ImpalaHttpClient(TTransportBase):
else: else:
self.__timeout = ms / 1000.0 self.__timeout = ms / 1000.0
def setCustomHeaders(self, headers): def getCustomHeadersWithBasicAuth(self, cookie_header, has_auth_cookie):
self.__custom_headers = headers custom_headers = {}
if cookie_header:
# Add cookies to HTTP header.
custom_headers['Cookie'] = cookie_header
# Add the 'Authorization' header to request even if the auth cookie is
# present to avoid a round trip in case the cookie is expired when server
# receive the request. Since the 'auth' value is calculated once, so it
# won't cause a performance issue.
custom_headers['Authorization'] = "Basic " + self.__basic_auth
return custom_headers
# Extract cookies from response and save those cookies for which the cookie names def getCustomHeadersWithNegotiateAuth(self, cookie_header, has_auth_cookie):
# are in the cookie name list specified in the __init__(). import kerberos
def extractHttpCookiesFromResponse(self, headers): custom_headers = {}
if self.__http_cookie_dict: if cookie_header:
matching_cookies = \ # Add cookies to HTTP header.
get_all_matching_cookies(self.__http_cookie_dict.keys(), self.path, headers) custom_headers['Cookie'] = cookie_header
if matching_cookies: # For GSSAPI over http we need to dynamically generate custom request headers.
self.__are_matching_cookies_found = True if not has_auth_cookie:
for c in matching_cookies: try:
self.__http_cookie_dict[c.key] = Cookie(c, get_cookie_expiry(c)) _, krb_context = kerberos.authGSSClientInit(self.__kerb_service)
kerberos.authGSSClientStep(krb_context, "")
negotiate_details = kerberos.authGSSClientResponse(krb_context)
custom_headers['Authorization'] = "Negotiate " + negotiate_details
except kerberos.GSSError:
raise AuthenticationException("Kerberos authentication failure.")
return custom_headers
# Return the value as a cookie list for Cookie header. It's a list of name-value def getCustomHeadersWithoutAuth(self, cookie_header, has_auth_cookie):
custom_headers = {}
if cookie_header:
# Add cookies to HTTP header.
custom_headers['Cookie'] = cookie_header
return custom_headers
# Set function to generate customized HTTP headers for Ldap authorization.
def setLdapAuth(self, basic_auth):
# auth mechanism: LDAP
self.__basic_auth = basic_auth
self.__get_custom_headers_func = self.getCustomHeadersWithBasicAuth
# Set function to generate customized HTTP headers for Kerberos authorization.
def setKerberosAuth(self, kerb_service):
# auth mechanism: GSSAPI
self.__kerb_service = kerb_service
self.__get_custom_headers_func = self.getCustomHeadersWithNegotiateAuth
# Set function to generate customized HTTP headers without authorization.
def setNoneAuth(self):
# auth mechanism: None
self.__get_custom_headers_func = self.getCustomHeadersWithoutAuth
# Update HTTP headers based on the saved cookies and auth mechanism.
def refreshCustomHeaders(self):
if self.__get_custom_headers_func:
cookie_header, has_auth_cookie = self.getHttpCookieHeaderForRequest()
self.__custom_headers = \
self.__get_custom_headers_func(cookie_header, has_auth_cookie)
# Return first value as a cookie list for Cookie header. It's a list of name-value
# pairs in the form of <cookie-name>=<cookie-value>. Pairs in the list are separated by # pairs in the form of <cookie-name>=<cookie-value>. Pairs in the list are separated by
# a semicolon and a space ('; '). # a semicolon and a space ('; ').
# Return second value as True if the cookie list contains auth cookie.
def getHttpCookieHeaderForRequest(self): def getHttpCookieHeaderForRequest(self):
if (self.__http_cookie_dict is None) or not self.__are_matching_cookies_found: if (self.__http_cookie_dict is None) or not self.__are_matching_cookies_found:
return None return None, False
cookie_headers = [] cookie_headers = []
has_auth_cookie = False
for cn, c_tuple in self.__http_cookie_dict.items(): for cn, c_tuple in self.__http_cookie_dict.items():
if c_tuple.cookie: if c_tuple.cookie:
if c_tuple.expiry_time and c_tuple.expiry_time <= datetime.datetime.now(): if c_tuple.expiry_time and c_tuple.expiry_time <= datetime.datetime.now():
@@ -193,18 +250,36 @@ class ImpalaHttpClient(TTransportBase):
else: else:
cookie_header = c_tuple.cookie.output(attrs=['value'], header='').strip() cookie_header = c_tuple.cookie.output(attrs=['value'], header='').strip()
cookie_headers.append(cookie_header) cookie_headers.append(cookie_header)
if not has_auth_cookie and self.__auth_cookie_names \
and cn in self.__auth_cookie_names:
has_auth_cookie = True
if not cookie_headers: if not cookie_headers:
self.__are_matching_cookies_found = False self.__are_matching_cookies_found = False
return None return None, False
else: else:
return '; '.join(cookie_headers) return '; '.join(cookie_headers), has_auth_cookie
# Add HTTP cookie headers based on the saved cookies. # Extract cookies from response and save those cookies for which the cookie names
def addHttpCookiesToRequestHeaders(self): # are in the cookie name list specified in the __init__().
def extractHttpCookiesFromResponse(self):
if self.__http_cookie_dict: if self.__http_cookie_dict:
cookie_headers = self.getHttpCookieHeaderForRequest() matching_cookies = get_all_matching_cookies(
if cookie_headers: self.__http_cookie_dict.keys(), self.path, self.headers)
self.__http.putheader('Cookie', cookie_headers) if matching_cookies:
self.__are_matching_cookies_found = True
for c in matching_cookies:
self.__http_cookie_dict[c.key] = Cookie(c, get_cookie_expiry(c))
# Return True if there are any saved cookies which are sent in previous request.
def areHttpCookiesSaved(self):
return self.__are_matching_cookies_found
# Clean all saved cookies.
def cleanHttpCookies(self):
if (self.__http_cookie_dict is not None) and self.__are_matching_cookies_found:
self.__are_matching_cookies_found = False
self.__http_cookie_dict = \
{cn: Cookie(cookie=None, expiry_time=None) for cn in self.__http_cookie_dict}
def read(self, sz): def read(self, sz):
return self.__http_response.read(sz) return self.__http_response.read(sz)
@@ -216,57 +291,73 @@ class ImpalaHttpClient(TTransportBase):
self.__wbuf.write(buf) self.__wbuf.write(buf)
def flush(self): def flush(self):
if self.isOpen(): # Send HTTP request and receive response.
self.close() # Return True if the client should retry this method.
self.open() def sendRequestRecvResp(data):
if self.isOpen():
self.close()
self.open()
# HTTP request
if self.using_proxy() and self.scheme == "http":
# need full URL of real host for HTTP proxy here (HTTPS uses CONNECT tunnel)
self.__http.putrequest('POST', "http://%s:%s%s" %
(self.realhost, self.realport, self.path))
else:
self.__http.putrequest('POST', self.path)
# Write headers
self.__http.putheader('Content-Type', 'application/x-thrift')
data_len = len(data)
self.__http.putheader('Content-Length', str(data_len))
if data_len > ImpalaHttpClient.MIN_REQUEST_SIZE_FOR_EXPECT:
# Add the 'Expect' header to large requests. Note that we do not explicitly wait
# for the '100 continue' response before sending the data - HTTPConnection simply
# ignores these types of responses, but we'll get the right behavior anyways.
self.__http.putheader("Expect", "100-continue")
if self.using_proxy() and self.scheme == "http" and self.proxy_auth is not None:
self.__http.putheader("Proxy-Authorization", self.proxy_auth)
self.refreshCustomHeaders()
if not self.__custom_headers or 'User-Agent' not in self.__custom_headers:
user_agent = 'Python/ImpalaHttpClient'
script = os.path.basename(sys.argv[0])
if script:
user_agent = '%s (%s)' % (user_agent, urllib.parse.quote(script))
self.__http.putheader('User-Agent', user_agent)
if self.__custom_headers:
for key, val in six.iteritems(self.__custom_headers):
self.__http.putheader(key, val)
self.__http.endheaders()
# Write payload
self.__http.send(data)
# Get reply to flush the request
self.__http_response = self.__http.getresponse()
self.code = self.__http_response.status
self.message = self.__http_response.reason
self.headers = self.__http_response.msg
# A '401 Unauthorized' response might mean that we tried cookie-based
# authentication with one or more expired cookies.
# Delete the cookies and try again.
if self.code == 401 and self.areHttpCookiesSaved():
self.cleanHttpCookies()
return True
else:
self.extractHttpCookiesFromResponse()
return False
# Pull data out of buffer # Pull data out of buffer
data = self.__wbuf.getvalue() data = self.__wbuf.getvalue()
self.__wbuf = BytesIO() self.__wbuf = BytesIO()
# HTTP request retry = sendRequestRecvResp(data)
if self.using_proxy() and self.scheme == "http": if retry:
# need full URL of real host for HTTP proxy here (HTTPS uses CONNECT tunnel) # Received "401 Unauthorized" response. Delete HTTP cookies and then retry.
self.__http.putrequest('POST', "http://%s:%s%s" % sendRequestRecvResp(data)
(self.realhost, self.realport, self.path))
else:
self.__http.putrequest('POST', self.path)
# Write headers
self.__http.putheader('Content-Type', 'application/x-thrift')
data_len = len(data)
self.__http.putheader('Content-Length', str(data_len))
if data_len > ImpalaHttpClient.MIN_REQUEST_SIZE_FOR_EXPECT:
# Add the 'Expect' header to large requests. Note that we do not explicitly wait
# for the '100 continue' response before sending the data - HTTPConnection simply
# ignores these types of responses, but we'll get the right behavior anyways.
self.__http.putheader("Expect", "100-continue")
if self.using_proxy() and self.scheme == "http" and self.proxy_auth is not None:
self.__http.putheader("Proxy-Authorization", self.proxy_auth)
if not self.__custom_headers or 'User-Agent' not in self.__custom_headers:
user_agent = 'Python/ImpalaHttpClient'
script = os.path.basename(sys.argv[0])
if script:
user_agent = '%s (%s)' % (user_agent, urllib.parse.quote(script))
self.__http.putheader('User-Agent', user_agent)
if self.__custom_headers:
for key, val in six.iteritems(self.__custom_headers):
self.__http.putheader(key, val)
self.addHttpCookiesToRequestHeaders()
self.__http.endheaders()
# Write payload
self.__http.send(data)
# Get reply to flush the request
self.__http_response = self.__http.getresponse()
self.code = self.__http_response.status
self.message = self.__http_response.reason
self.headers = self.__http_response.msg
self.extractHttpCookiesFromResponse(self.headers)
if self.code >= 300: if self.code >= 300:
# Report any http response code that is not 1XX (informational response) or # Report any http response code that is not 1XX (informational response) or

View File

@@ -388,14 +388,6 @@ class ImpalaClient(object):
# timeout so that in case of any connection errors, the client retries have a better # timeout so that in case of any connection errors, the client retries have a better
# chance of succeeding. # chance of succeeding.
# HTTP server implemententations do not support SPNEGO yet.
# TODO: when we add support for Kerberos+HTTP, we need to re-enable the automatic
# kerberos retry logic in impala_shell.py that was disabled for HTTP because of
# IMPALA-8932.
if self.use_kerberos or self.kerberos_host_fqdn:
print("Kerberos not supported with HTTP endpoints.", file=sys.stderr)
raise NotImplementedError()
host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port) host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port)
assert self.http_path assert self.http_path
# ImpalaHttpClient relies on the URI scheme (http vs https) to open an appropriate # ImpalaHttpClient relies on the URI scheme (http vs https) to open an appropriate
@@ -417,10 +409,20 @@ class ImpalaClient(object):
socket_timeout_s=self.http_socket_timeout_s) socket_timeout_s=self.http_socket_timeout_s)
if self.use_ldap: if self.use_ldap:
# Set the BASIC auth header # Set the BASIC authorization
user_passwd = "{0}:{1}".format(self.user, self.ldap_password) user_passwd = "{0}:{1}".format(self.user, self.ldap_password)
auth = base64.encodestring(user_passwd.encode()).decode().strip('\n') auth = base64.encodestring(user_passwd.encode()).decode().strip('\n')
transport.setCustomHeaders({"Authorization": "Basic {0}".format(auth)}) transport.setLdapAuth(auth)
elif self.use_kerberos or self.kerberos_host_fqdn:
# Set the Kerberos service
if self.kerberos_host_fqdn is not None:
kerb_host = self.kerberos_host_fqdn.split(':')[0].encode('ascii', 'ignore')
else:
kerb_host = self.impalad_host
kerb_service = "{0}@{1}".format(self.kerberos_service_name, kerb_host)
transport.setKerberosAuth(kerb_service)
else:
transport.setNoneAuth()
# Without buffering Thrift would call socket.recv() each time it deserializes # Without buffering Thrift would call socket.recv() each time it deserializes
# something (e.g. a member in a struct). # something (e.g. a member in a struct).

View File

@@ -953,8 +953,7 @@ class ImpalaShell(cmd.Cmd, object):
# If the connection fails and the Kerberos has not been enabled, # If the connection fails and the Kerberos has not been enabled,
# check for a valid kerberos ticket and retry the connection # check for a valid kerberos ticket and retry the connection
# with kerberos enabled. # with kerberos enabled.
# IMPALA-8932: Kerberos is not yet supported for hs2-http, so don't retry. if not self.imp_client.connected and not self.use_kerberos:
if not self.imp_client.connected and not self.use_kerberos and protocol != 'hs2-http':
try: try:
if call(["klist", "-s"]) == 0: if call(["klist", "-s"]) == 0:
print("Kerberos ticket found in the credentials cache, retrying " print("Kerberos ticket found in the credentials cache, retrying "

View File

@@ -1,5 +1,6 @@
bitarray==2.3.0 bitarray==2.3.0
configparser==4.0.2 configparser==4.0.2
kerberos==1.3.1
prettytable==0.7.2 prettytable==0.7.2
sasl==0.2.1 sasl==0.2.1
setuptools>=36.8.0 setuptools>=36.8.0

View File

@@ -41,6 +41,14 @@ class DisconnectedException(Exception):
return self.value return self.value
class AuthenticationException(Exception):
def __init__(self, value=""):
self.value = value
def __str__(self):
return self.value
class QueryCancelledByShellException(Exception): pass class QueryCancelledByShellException(Exception): pass