diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc index 181741664..f8cab8e48 100644 --- a/be/src/rpc/thrift-server.cc +++ b/be/src/rpc/thrift-server.cc @@ -270,9 +270,10 @@ ThriftServer::ThriftServer(const string& name, const std::shared_ptr& processor, int port, AuthProvider* auth_provider, MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms, int64_t idle_poll_period_ms, TransportType transport_type, - bool is_external_facing) + bool is_external_facing, string host) : started_(false), port_(port), + host_(std::move(host)), ssl_enabled_(false), max_concurrent_connections_(max_concurrent_connections), queue_timeout_ms_(queue_timeout_ms), @@ -342,7 +343,7 @@ Status ThriftServer::CreateSocket(std::shared_ptr* socket) { socket_factory->loadCertificate(certificate_path_.c_str()); socket_factory->loadPrivateKey(private_key_path_.c_str()); ImpalaKeepAliveServerSocket* server_socket = - new ImpalaKeepAliveServerSocket(port_, socket_factory); + new ImpalaKeepAliveServerSocket(host_, port_, socket_factory); server_socket->setKeepAliveOptions(keepalive_probe_period_s_, keepalive_retry_period_s_, keepalive_retry_count_); socket->reset(server_socket); @@ -351,7 +352,7 @@ Status ThriftServer::CreateSocket(std::shared_ptr* socket) { } } else { ImpalaKeepAliveServerSocket* server_socket = - new ImpalaKeepAliveServerSocket(port_); + new ImpalaKeepAliveServerSocket(host_, port_); server_socket->setKeepAliveOptions(keepalive_probe_period_s_, keepalive_retry_period_s_, keepalive_retry_count_); socket->reset(server_socket); diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h index 820526ca1..a6fe327a7 100644 --- a/be/src/rpc/thrift-server.h +++ b/be/src/rpc/thrift-server.h @@ -309,7 +309,7 @@ class ThriftServer { int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0, int64_t idle_poll_period_ms = 0, TransportType server_transport = TransportType::BINARY, - bool is_external_facing = true); + bool is_external_facing = true, std::string host = ""); /// Enables secure access over SSL. Must be called before Start(). The first three /// arguments are the minimum SSL/TLS version, and paths to certificate and private key @@ -345,6 +345,9 @@ class ThriftServer { /// replaced with whatever port number the server is listening on. int port_; + /// The host name to bind with. + string host_; + /// True if the server socket only accepts SSL connections bool ssl_enabled_; @@ -546,6 +549,11 @@ class ThriftServerBuilder { return *this; } + ThriftServerBuilder& host(const string& host) { + host_ = host; + return *this; + } + /// Constructs a new ThriftServer and puts it in 'server', if construction was /// successful, returns an error otherwise. In the error case, 'server' will not have /// been set and will not need to be freed, otherwise the caller assumes ownership of @@ -554,7 +562,7 @@ class ThriftServerBuilder { std::unique_ptr ptr( new ThriftServer(name_, processor_, port_, auth_provider_, metrics_, max_concurrent_connections_, queue_timeout_ms_, idle_poll_period_ms_, - server_transport_type_, is_external_facing_)); + server_transport_type_, is_external_facing_, host_)); if (enable_ssl_) { RETURN_IF_ERROR(ptr->EnableSsl( version_, certificate_, private_key_, pem_password_cmd_, cipher_list_, @@ -572,6 +580,7 @@ class ThriftServerBuilder { int max_concurrent_connections_ = 0; std::string name_; std::shared_ptr processor_; + std::string host_; int port_ = 0; ThriftServer::TransportType server_transport_type_ = ThriftServer::TransportType::BINARY; diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index a8bfd5ace..9584405ad 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -157,6 +157,9 @@ DEFINE_bool(enable_external_fe_http, false, "if true enables http transport for external_fe_port otherwise binary transport is " "used"); +DEFINE_string(external_interface, "", + "Host name to bind with in beeswax/hs2/hs2-http. \"::\" allows IPv6 (dual stack)."); + DEFINE_int32(fe_service_threads, 64, "number of threads available to serve client requests"); DEFINE_string(default_query_options, "", "key=value pair of default query options for" @@ -3238,6 +3241,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, .keepalive(FLAGS_client_keepalive_probe_period_s, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) + .host(FLAGS_external_interface) .Build(&server)); beeswax_server_.reset(server); beeswax_server_->SetConnectionHandler(this); @@ -3270,6 +3274,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, .keepalive(FLAGS_client_keepalive_probe_period_s, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) + .host(FLAGS_external_interface) .Build(&server)); hs2_server_.reset(server); hs2_server_->SetConnectionHandler(this); @@ -3305,6 +3310,8 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) .Build(&server)); + // FLAGS_external_interface is not passed to external external_fe_port. If this is + // needed (e.g. for dual stack) then another subtask can be added to IMPALA-13819. external_fe_server_.reset(server); external_fe_server_->SetConnectionHandler(this); } @@ -3338,6 +3345,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, .keepalive(FLAGS_client_keepalive_probe_period_s, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) + .host(FLAGS_external_interface) .Build(&http_server)); hs2_http_server_.reset(http_server); hs2_http_server_->SetConnectionHandler(this); diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 4468f3ba7..e254620b4 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -163,6 +163,8 @@ add_dependencies(Util gen-deps) # Squeasel requires C99 compatibility to build. SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c PROPERTIES COMPILE_FLAGS -std=c99) +SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c + PROPERTIES COMPILE_FLAGS -DUSE_IPV6) # shared library which provides native logging support to JVMs over JNI. add_library(loggingsupport SHARED diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index b59cd408b..61b499eab 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -53,6 +53,7 @@ using std::random_device; namespace impala { const string LOCALHOST_IP_STR("127.0.0.1"); +const string LOCALHOST_IP_V6_STR("::1"); Status GetHostname(string* hostname) { char name[HOST_NAME_MAX]; @@ -67,33 +68,44 @@ Status GetHostname(string* hostname) { return Status::OK(); } -Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){ +Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip, bool ipv6){ // Try to resolve via the operating system. vector addresses; addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_INET; // IPv4 addresses only + hints.ai_family = ipv6 ? AF_INET6 : AF_INET; hints.ai_socktype = SOCK_STREAM; struct addrinfo* addr_info; if (getaddrinfo(hostname.c_str(), NULL, &hints, &addr_info) != 0) { stringstream ss; - ss << "Could not find IPv4 address for: " << hostname; + ss << "Could not find IPv" << (ipv6 ? 6 : 4) << " address for: " << hostname; return Status(ss.str()); } addrinfo* it = addr_info; while (it != NULL) { - char addr_buf[64]; - const char* result = - inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, addr_buf, 64); + char addr_buf[INET6_ADDRSTRLEN]; + const char* result = nullptr; + bool is_ipv6_addr = it->ai_family == AF_INET6; + if (is_ipv6_addr) { + result = inet_ntop(AF_INET6, &((sockaddr_in6*)it->ai_addr)->sin6_addr, + addr_buf, sizeof(addr_buf)); + } else { + result = inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, + addr_buf, sizeof(addr_buf)); + } + if (result == NULL) { stringstream ss; - ss << "Could not convert IPv4 address for: " << hostname; + ss << "Could not convert IPv" << (is_ipv6_addr ? 6: 4) + << "address for: " << hostname; freeaddrinfo(addr_info); return Status(ss.str()); } - addresses.push_back(string(addr_buf)); + if (is_ipv6_addr == ipv6) { + addresses.push_back(string(addr_buf)); + } it = it->ai_next; } @@ -101,7 +113,7 @@ Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){ if (addresses.empty()) { stringstream ss; - ss << "Could not convert IPv4 address for: " << hostname; + ss << "Could not convert IPv" << (ipv6 ? 6 : 4) << " address for: " << hostname; return Status(ss.str()); } @@ -130,7 +142,7 @@ bool IsResolvedAddress(const NetworkAddressPB& addr) { bool FindFirstNonLocalhost(const vector& addresses, string* addr) { for (const string& candidate: addresses) { - if (candidate != LOCALHOST_IP_STR) { + if (candidate != LOCALHOST_IP_STR && candidate != LOCALHOST_IP_V6_STR) { *addr = candidate; return true; } @@ -226,7 +238,13 @@ bool IsWildcardAddress(const string& ipaddress) { string TNetworkAddressToString(const TNetworkAddress& address) { stringstream ss; - ss << address.hostname << ":" << dec << address.port; + if (address.hostname.find(':') == string::npos) { + // IPv4 + ss << address.hostname << ":" << dec << address.port; + } else { + // IPv6 + ss << "[" << address.hostname << "]:" << dec << address.port; + } return ss.str(); } diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index 0facbb191..88071cc6a 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -42,9 +42,11 @@ bool IsResolvedAddress(const NetworkAddressPB& addr); /// Looks up all IP addresses associated with a given hostname and returns one of them via /// 'address'. If the IP addresses of a host don't change, then subsequent calls will -/// always return the same address. Returns an error status if any system call failed, -/// otherwise OK. Even if OK is returned, addresses may still be of zero length. -Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) WARN_UNUSED_RESULT; +/// always return the same address. Returns an error status if any system call failed or +/// no address was found, otherwise OK. +/// Returns only ipv6 addresses if 'ipv6' is true, otherwise only ipv4 addresses. +Status HostnameToIpAddr( + const Hostname& hostname, IpAddr* ip, bool ipv6=false) WARN_UNUSED_RESULT; /// Finds the first non-localhost IP address in the given list. Returns /// true if such an address was found, false otherwise. diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc index 2d8ac1ac5..be7e3300d 100644 --- a/be/src/util/webserver.cc +++ b/be/src/util/webserver.cc @@ -408,17 +408,25 @@ bool Webserver::IsSecure() const { Status Webserver::Start() { LOG(INFO) << "Starting webserver on " << TNetworkAddressToString(http_address_); - IpAddr ip; - RETURN_IF_ERROR(HostnameToIpAddr(http_address_.hostname, &ip)); - stringstream listening_spec; - listening_spec << ip << ":" << http_address_.port; + IpAddr ipv4, ipv6; + Status ip_v6_status = HostnameToIpAddr(http_address_.hostname, &ipv6, true); + Status ip_v4_status = HostnameToIpAddr(http_address_.hostname, &ipv4, false); + if (!ip_v4_status.ok() && !ip_v6_status.ok()) return ip_v6_status; - if (IsSecure()) { - LOG(INFO) << "Webserver: Enabling HTTPS support"; - // Squeasel makes sockets with 's' suffixes accept SSL traffic only - listening_spec << "s"; + if (IsSecure()) LOG(INFO) << "Webserver: Enabling HTTPS support"; + stringstream listening_spec; + if (ip_v6_status.ok()) { + listening_spec << "[" << ipv6 << "]:" << http_address_.port; + if (IsSecure()) listening_spec << "s"; + } + if (ip_v4_status.ok()) { + if (ip_v6_status.ok()) listening_spec << ","; + listening_spec << ipv4 << ":" << http_address_.port; + if (IsSecure()) listening_spec << "s"; } string listening_str = listening_spec.str(); + LOG(INFO) << "Starting webserver listening to " << listening_str; + vector options; if (!FLAGS_webserver_doc_root.empty() && FLAGS_enable_webserver_doc_root) { diff --git a/shell/impala_shell/impala_client.py b/shell/impala_shell/impala_client.py index 8e6164746..7fae660bd 100755 --- a/shell/impala_shell/impala_client.py +++ b/shell/impala_shell/impala_client.py @@ -429,8 +429,7 @@ class ImpalaClient(object): # symptoms in case of a problematic remote endpoint. It's better to have a finite # timeout so that in case of any connection errors, the client retries have a better # chance of succeeding. - - host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port) + host_and_port = self._to_host_port(self.impalad_host, self.impalad_port) assert self.http_path # ImpalaHttpClient relies on the URI scheme (http vs https) to open an appropriate # connection to the server. @@ -582,6 +581,13 @@ class ImpalaClient(object): num_deleted_rows = sum([int(k) for k in dml_result.rows_deleted.values()]) return (num_rows, num_deleted_rows, dml_result.num_row_errors) + @staticmethod + def _to_host_port(host, port): + # Wrap ipv6 addresses in brackets. + is_ipv6_address = ":" in host + fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}" + return fmt.format(host, port) + class ImpalaHS2Client(ImpalaClient): """Impala client. Uses the HS2 protocol plus Impala-specific extensions.""" diff --git a/shell/impala_shell/impala_shell.py b/shell/impala_shell/impala_shell.py index 6de925ccb..2445e8629 100755 --- a/shell/impala_shell/impala_shell.py +++ b/shell/impala_shell/impala_shell.py @@ -1001,6 +1001,31 @@ class ImpalaShell(cmd.Cmd, object): """Exit the impala shell""" return self.do_quit(args) + @staticmethod + def __parse_host_port(addr): + """Checks if the host name also contains a port and separates the two. + Returns either [host] or [host, port]. Detects if host is an ipv6 address like "[::]" + and removes the brackets from it. + """ + split_by_colon = addr.split(':') + ipv6addr = len(split_by_colon) > 2 + host_port = None + if ipv6addr: + if addr[0] == "[": + parts = addr.split("]") + host_port = [parts[0][1:]] + has_port = parts[1] != "" + if has_port: + if parts[1][0] != ":": return None + host_port.append(parts[1][1:]) + else: + host_port = [addr] + else: + host_port = [val for val in split_by_colon if val.strip()] + # validate the connection string. + if ':' in addr and len(host_port) != 2: return None + return host_port + def do_connect(self, args): """Connect to an Impalad instance: Usage: connect, defaults to the fqdn of the localhost and the protocol's default port @@ -1023,10 +1048,10 @@ class ImpalaShell(cmd.Cmd, object): if not args: args = socket.getfqdn() tokens = args.split(" ") - # validate the connection string. - host_port = [val for val in tokens[0].split(':') if val.strip()] + addr = tokens[0] + host_port = self.__parse_host_port(addr) protocol = options.protocol.lower() - if (':' in tokens[0] and len(host_port) != 2): + if not host_port: print("Connection string must either be empty, or of the form " "", file=sys.stderr) return CmdStatus.ERROR diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index ac0eba84c..0ccc78159 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -35,6 +35,7 @@ import sys import time from builtins import filter, map + from thrift.protocol import TBinaryProtocol from thrift.Thrift import TApplicationException from thrift.transport.TTransport import TTransportException @@ -42,6 +43,7 @@ from thrift.transport.TTransport import TTransportException from impala_thrift_gen.beeswax import BeeswaxService from impala_thrift_gen.beeswax.BeeswaxService import QueryState from impala_thrift_gen.ImpalaService import ImpalaService +from tests.common.network import split_host_port from tests.util.thrift_util import create_transport LOG = logging.getLogger('impala_beeswax') @@ -115,12 +117,9 @@ class ImpalaBeeswaxClient(object): def __init__(self, impalad, use_kerberos=False, user=None, password=None, use_ssl=False): self.connected = False - split_impalad = impalad.split(":") - assert len(split_impalad) in [1, 2] - self.impalad_host = split_impalad[0] - self.impalad_port = 21000 # Default beeswax port - if len(split_impalad) == 2: - self.impalad_port = int(split_impalad[1]) + host, port = split_host_port(impalad) + self.impalad_host = host + self.impalad_port = port if port else 21000 # Default beeswax port self.imp_service = None self.transport = None self.use_kerberos = use_kerberos @@ -170,7 +169,7 @@ class ImpalaBeeswaxClient(object): def close_connection(self): """Close the transport if it's still open""" - if self.transport: + if self.transport and self.connected: self.transport.close() self.connected = False diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index f39072050..5cf512c70 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -614,6 +614,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): # Failure tests expect cluster to be initialised even if start-impala-cluster fails. cls.cluster = ImpalaCluster.get_e2e_test_cluster() + cls.impalad_test_service = cls.create_impala_service() PREVIOUS_CMD_STR = cmd_str @@ -650,3 +651,9 @@ class CustomClusterTestSuite(ImpalaTestSuite): for impalad in cls.cluster.impalads: impalad.service.wait_for_num_known_live_backends(expected_num_impalads, timeout=impalad_timeout_s) + + @classmethod + def create_impala_service(cls): + """Override ImpalaTestSuite to return 1st impalad of custom cluster. + Returns None if no impalad was started.""" + return cls.cluster.impalads[0].service if cls.cluster.impalads else None diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index c6c44bc76..96c436735 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -570,14 +570,21 @@ class BaseImpalaProcess(Process): def _get_webserver_certificate_file(self): # TODO: if this is containerised, the path will likely not be the same on the host. + # TODO: what we need in the client is the CA, not the server cert return self._get_arg_value("webserver_certificate_file", "") + def _get_ssl_client_ca_certificate(self): + return self._get_arg_value("ssl_client_ca_certificate", "") + def _get_hostname(self): return self._get_arg_value("hostname", socket.gethostname()) def _get_webserver_interface(self): return self._get_arg_value("webserver_interface", socket.gethostname()) + def _get_external_interface(self): + return self._get_arg_value("external_interface", socket.gethostname()) + def _get_arg_value(self, arg_name, default=None): """Gets the argument value for given argument name""" for arg in self.cmd: @@ -600,10 +607,12 @@ class BaseImpalaProcess(Process): class ImpaladProcess(BaseImpalaProcess): def __init__(self, cmd, container_id=None, port_map=None): super(ImpaladProcess, self).__init__(cmd, container_id, port_map) + self.external_interface = self._get_external_interface() self.service = ImpaladService(self.hostname, self.webserver_interface, + self.external_interface, self.get_webserver_port(), self.__get_beeswax_port(), self.__get_krpc_port(), self.__get_hs2_port(), self.__get_hs2_http_port(), - self._get_webserver_certificate_file()) + self._get_webserver_certificate_file(), self._get_ssl_client_ca_certificate()) def _get_default_webserver_port(self): return DEFAULT_IMPALAD_WEBSERVER_PORT @@ -685,7 +694,7 @@ class StateStoreProcess(BaseImpalaProcess): super(StateStoreProcess, self).__init__(cmd, container_id, port_map) self.service = StateStoredService(self.hostname, self.webserver_interface, self.get_webserver_port(), self._get_webserver_certificate_file(), - self.__get_port()) + self._get_ssl_client_ca_certificate(), self.__get_port()) def _get_default_webserver_port(self): return DEFAULT_STATESTORED_WEBSERVER_PORT @@ -712,6 +721,7 @@ class CatalogdProcess(BaseImpalaProcess): super(CatalogdProcess, self).__init__(cmd, container_id, port_map) self.service = CatalogdService(self.hostname, self.webserver_interface, self.get_webserver_port(), self._get_webserver_certificate_file(), + self._get_ssl_client_ca_certificate(), self.__get_port()) def _get_default_webserver_port(self): @@ -743,7 +753,8 @@ class AdmissiondProcess(BaseImpalaProcess): def __init__(self, cmd, container_id=None, port_map=None): super(AdmissiondProcess, self).__init__(cmd, container_id, port_map) self.service = AdmissiondService(self.hostname, self.webserver_interface, - self.get_webserver_port(), self._get_webserver_certificate_file()) + self.get_webserver_port(), self._get_webserver_certificate_file(), + self._get_ssl_client_ca_certificate()) def _get_default_webserver_port(self): return DEFAULT_ADMISSIOND_WEBSERVER_PORT diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index f57c1f3a3..9aad86e66 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -40,6 +40,7 @@ from tests.beeswax.impala_beeswax import ( ImpalaBeeswaxException, ) import tests.common +from tests.common.network import split_host_port from tests.common.patterns import LOG_FORMAT from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP from tests.util.thrift_util import op_handle_to_query_id, session_handle_to_session_id @@ -610,13 +611,13 @@ class ImpylaHS2Connection(ImpalaConnection): return self.__cursor def connect(self): - host, port = self.__host_port.split(":") + host, port = split_host_port(self.__host_port) conn_kwargs = {} if self._is_hive: conn_kwargs['auth_mechanism'] = 'PLAIN' try: self.__impyla_conn = impyla.connect( - host=host, port=int(port), use_http_transport=self.__use_http_transport, + host=host, port=port, use_http_transport=self.__use_http_transport, http_path=self.__http_path, use_ssl=self.__use_ssl, **conn_kwargs) self.log_client("connected to {0} with impyla {1}".format( self.__host_port, self.get_test_protocol())) diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index ae9707ff8..fe808a499 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -35,6 +35,7 @@ from thrift.transport.TSocket import TSocket from thrift.transport.TTransport import TBufferedTransport from tests.common.impala_connection import create_connection, create_ldap_connection +from tests.common.network import to_host_port, CERT_TO_CA_MAP from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP LOG = logging.getLogger('impala_service') @@ -48,7 +49,7 @@ WEBSERVER_PASSWORD = os.environ.get('IMPALA_WEBSERVER_PASSWORD', None) # TODO: Refactor the retry/timeout logic into a common place. class BaseImpalaService(object): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file): + webserver_certificate_file, ssl_client_ca_certificate_file): self.hostname = hostname self.webserver_interface = webserver_interface if webserver_interface == "": @@ -56,6 +57,7 @@ class BaseImpalaService(object): self.webserver_interface = hostname self.webserver_port = webserver_port self.webserver_certificate_file = webserver_certificate_file + self.ssl_client_ca_certificate_file = ssl_client_ca_certificate_file self.webserver_username_password = None if WEBSERVER_USERNAME is not None and WEBSERVER_PASSWORD is not None: self.webserver_username_password = (WEBSERVER_USERNAME, WEBSERVER_PASSWORD) @@ -68,10 +70,14 @@ class BaseImpalaService(object): protocol = "http" if self.webserver_certificate_file != "": protocol = "https" - url = "%s://%s:%d/%s" % \ - (protocol, self.webserver_interface, int(self.webserver_port), page_name) - return requests.get(url, verify=self.webserver_certificate_file, - auth=self.webserver_username_password) + host_port = to_host_port(self.webserver_interface, self.webserver_port) + url = "%s://%s/%s" % (protocol, host_port, page_name) + cert = self.webserver_certificate_file + # Instead of cert use its CA cert if available. + file_part = cert.split("/")[-1] + if file_part in CERT_TO_CA_MAP: + cert = cert.replace(file_part, CERT_TO_CA_MAP[file_part]) + return requests.get(url, verify=cert, auth=self.webserver_username_password) except Exception as e: LOG.info("Debug webpage not yet available: %s", str(e)) sleep(interval) @@ -260,11 +266,14 @@ class BaseImpalaService(object): # Allows for interacting with an Impalad instance to perform operations such as creating # new connections or accessing the debug webpage. class ImpaladService(BaseImpalaService): - def __init__(self, hostname, webserver_interface="", webserver_port=25000, - beeswax_port=21000, krpc_port=27000, hs2_port=21050, - hs2_http_port=28000, webserver_certificate_file=""): + def __init__(self, hostname, webserver_interface="", external_interface="", + webserver_port=25000, beeswax_port=21000, krpc_port=27000, hs2_port=21050, + hs2_http_port=28000, webserver_certificate_file="", + ssl_client_ca_certificate_file=""): super(ImpaladService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) + self.external_interface = external_interface if external_interface else hostname self.beeswax_port = beeswax_port self.krpc_port = krpc_port self.hs2_port = hs2_port @@ -444,30 +453,32 @@ class ImpaladService(BaseImpalaService): sleep(interval) return False - def is_port_open(self, port): + def use_ssl_for_clients(self): + return self.ssl_client_ca_certificate_file != "" + + def is_port_open(self, host, port): try: - sock = socket.create_connection((self.hostname, port), timeout=1) + sock = socket.create_connection((host, port), timeout=1) sock.close() return True except Exception: return False def webserver_port_is_open(self): - return self.is_port_open(self.webserver_port) + return self.is_port_open(self.webserver_interface, self.webserver_port) def create_beeswax_client(self, use_kerberos=False): """Creates a new beeswax client connection to the impalad. DEPRECATED: Use create_hs2_client() instead.""" - LOG.warning('beeswax protocol is deprecated.') - client = create_connection('%s:%d' % (self.hostname, self.beeswax_port), - use_kerberos, BEESWAX) + client = create_connection(to_host_port(self.external_interface, self.beeswax_port), + use_kerberos, BEESWAX, use_ssl=self.use_ssl_for_clients()) client.connect() return client def beeswax_port_is_open(self): """Test if the beeswax port is open. Does not need to authenticate.""" # Check if the port is open first to avoid chatty logging of Thrift connection. - if not self.is_port_open(self.beeswax_port): return False + if not self.is_port_open(self.external_interface, self.beeswax_port): return False try: # The beeswax client will connect successfully even if not authenticated. @@ -475,31 +486,31 @@ class ImpaladService(BaseImpalaService): client.close() return True except Exception as e: - LOG.info(e) return False def create_ldap_beeswax_client(self, user, password, use_ssl=False): - client = create_ldap_connection('%s:%d' % (self.hostname, self.beeswax_port), + client = create_ldap_connection(to_host_port(self.hostname, self.beeswax_port), user=user, password=password, use_ssl=use_ssl) client.connect() return client def create_hs2_client(self, user=None): """Creates a new HS2 client connection to the impalad""" - client = create_connection('%s:%d' % (self.hostname, self.hs2_port), - protocol=HS2, user=user) + client = create_connection('%s:%d' % (self.external_interface, self.hs2_port), + protocol=HS2, user=user, + use_ssl=self.use_ssl_for_clients()) client.connect() return client def hs2_port_is_open(self): """Test if the HS2 port is open. Does not need to authenticate.""" # Check if the port is open first to avoid chatty logging of Thrift connection. - if not self.is_port_open(self.hs2_port): return False + if not self.is_port_open(self.external_interface, self.hs2_port): return False # Impyla will try to authenticate as part of connecting, so preserve previous logic # that uses the HS2 thrift code directly. try: - sock = TSocket(self.hostname, self.hs2_port) + sock = TSocket(self.external_interface, self.hs2_port) transport = TBufferedTransport(sock) transport.open() transport.close() @@ -510,7 +521,7 @@ class ImpaladService(BaseImpalaService): def hs2_http_port_is_open(self): # Only check if the port is open, do not create Thrift transport. - return self.is_port_open(self.hs2_http_port) + return self.is_port_open(self.external_interface, self.hs2_http_port) def create_client(self, protocol): """Creates a new client connection for given protocol to this impalad""" @@ -520,7 +531,8 @@ class ImpaladService(BaseImpalaService): if protocol == BEESWAX: LOG.warning('beeswax protocol is deprecated.') port = self.beeswax_port - client = create_connection('%s:%d' % (self.hostname, port), protocol=protocol) + client = create_connection(to_host_port(self.external_interface, port), + protocol=protocol) client.connect() return client @@ -537,9 +549,10 @@ class ImpaladService(BaseImpalaService): # accessing the debug webpage. class StateStoredService(BaseImpalaService): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file, service_port): + webserver_certificate_file, ssl_client_ca_certificate_file, service_port): super(StateStoredService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) self.service_port = service_port def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1): @@ -554,9 +567,10 @@ class StateStoredService(BaseImpalaService): # accessing the debug webpage. class CatalogdService(BaseImpalaService): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file, service_port): + webserver_certificate_file, ssl_client_ca_certificate_file, service_port): super(CatalogdService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) self.service_port = service_port def get_catalog_version(self, timeout=10, interval=1): @@ -579,6 +593,7 @@ class CatalogdService(BaseImpalaService): class AdmissiondService(BaseImpalaService): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file): + webserver_certificate_file, ssl_client_ca_certificate_file): super(AdmissiondService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 0ae933bbd..77a5bc253 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -49,6 +49,7 @@ from tests.common.environ import ( from tests.common.errors import Timeout from tests.common.impala_connection import create_connection from tests.common.impala_service import ImpaladService +from tests.common.network import to_host_port from tests.common.test_dimensions import ( ALL_BATCH_SIZES, ALL_DISABLE_CODEGEN_OPTIONS, @@ -265,7 +266,11 @@ class ImpalaTestSuite(BaseTestSuite): cls.hs2_http_client = None cls.hive_client = None cls.hive_transport = None + + # In case of custom cluster tests this returns the 1st impalad or None if nothing + # is started. cls.impalad_test_service = cls.create_impala_service() + ImpalaTestSuite.impalad_test_service = cls.impalad_test_service # Override the shell history path so that commands run by any tests # don't write any history into the developer's file. @@ -377,10 +382,12 @@ class ImpalaTestSuite(BaseTestSuite): if protocol is None: protocol = cls.default_test_protocol() if host_port is None: - host_port = cls.__get_default_host_port(protocol) + host = cls.impalad_test_service.external_interface + host_port = to_host_port(host, cls._get_default_port(protocol)) + use_ssl = cls.impalad_test_service.use_ssl_for_clients() client = create_connection( host_port=host_port, use_kerberos=pytest.config.option.use_kerberos, - protocol=protocol, is_hive=is_hive, user=user) + protocol=protocol, is_hive=is_hive, user=user, use_ssl=use_ssl) client.connect() return client @@ -413,7 +420,7 @@ class ImpalaTestSuite(BaseTestSuite): host, port = host_port.split(':') port = str(int(port) + nth) host_port = host + ':' + port - return ImpalaTestSuite.create_impala_client(host_port, protocol=protocol) + return cls.create_impala_client(host_port, protocol=protocol) @classmethod def create_impala_clients(cls): @@ -448,6 +455,11 @@ class ImpalaTestSuite(BaseTestSuite): @classmethod def close_impala_clients(cls): """Closes Impala clients created by create_impala_clients().""" + # cls.client should be equal to one of belove, unless test method implicitly override. + # Closing twice would lead to error in some clients (impyla+SSL). + if cls.client not in (cls.beeswax_client, cls.hs2_client, cls.hs2_http_client): + cls.client.close() + cls.client = None if cls.beeswax_client: cls.beeswax_client.close() cls.beeswax_client = None @@ -457,11 +469,6 @@ class ImpalaTestSuite(BaseTestSuite): if cls.hs2_http_client: cls.hs2_http_client.close() cls.hs2_http_client = None - # cls.client should be equal to one of above, unless test method implicitly override. - # Closing twice should be OK. - if cls.client: - cls.client.close() - cls.client = None @classmethod def default_impala_client(cls, protocol): @@ -474,13 +481,13 @@ class ImpalaTestSuite(BaseTestSuite): raise Exception("unknown protocol: {0}".format(protocol)) @classmethod - def __get_default_host_port(cls, protocol): + def _get_default_port(cls, protocol): if protocol == BEESWAX: - return IMPALAD + return IMPALAD_BEESWAX_PORT elif protocol == HS2_HTTP: - return IMPALAD_HS2_HTTP_HOST_PORT + return IMPALAD_HS2_HTTP_PORT elif protocol == HS2: - return IMPALAD_HS2_HOST_PORT + return IMPALAD_HS2_PORT else: raise NotImplementedError("Not yet implemented: protocol=" + protocol) @@ -497,13 +504,8 @@ class ImpalaTestSuite(BaseTestSuite): raise NotImplementedError("Not yet implemented: protocol=" + protocol) @classmethod - def create_impala_service( - cls, host_port=IMPALAD, webserver_interface="", webserver_port=25000): - host, port = host_port.split(':') - if webserver_interface == "": - webserver_interface = host - return ImpaladService(host, beeswax_port=port, - webserver_interface=webserver_interface, webserver_port=webserver_port) + def create_impala_service(cls): + return ImpaladService(IMPALAD_HOSTNAME) @classmethod def create_hdfs_client(cls): @@ -773,7 +775,7 @@ class ImpalaTestSuite(BaseTestSuite): target_impalad_clients = list() if multiple_impalad: target_impalad_clients =\ - [ImpalaTestSuite.create_impala_client(host_port, protocol=protocol) + [self.create_impala_client(host_port, protocol=protocol) for host_port in self.__get_cluster_host_ports(protocol)] else: target_impalad_clients = [self.default_impala_client(protocol)] @@ -822,7 +824,7 @@ class ImpalaTestSuite(BaseTestSuite): Helper to execute a query block in Hive. No special handling of query options is done, since we use a separate session for each block. """ - h = ImpalaTestSuite.create_impala_client(HIVE_HS2_HOST_PORT, protocol=HS2, + h = self.create_impala_client(HIVE_HS2_HOST_PORT, protocol=HS2, is_hive=True) try: result = None @@ -1788,3 +1790,25 @@ class ImpalaTestSuite(BaseTestSuite): break properties[fields[1].rstrip()] = fields[2].rstrip() return properties + + # Checks if an Impala connection is functional. + @staticmethod + def check_connection(conn): + res = conn.execute("select 1 + 1") + assert res.data == ["2"] + + # Checks connections for all protocols. + def check_connections(cls, expected_count=3): + # default client must exist + cls.check_connection(cls.client) + count = 0 + if cls.beeswax_client: + cls.check_connection(cls.beeswax_client) + count += 1 + if cls.hs2_client: + cls.check_connection(cls.hs2_client) + count += 1 + if cls.hs2_http_client: + cls.check_connection(cls.hs2_http_client) + count += 1 + assert count == expected_count diff --git a/tests/common/network.py b/tests/common/network.py index 55502b4eb..7a2b11814 100644 --- a/tests/common/network.py +++ b/tests/common/network.py @@ -19,7 +19,9 @@ from __future__ import absolute_import, division, print_function import socket +import ssl +from tests.common.environ import IS_REDHAT_DERIVATIVE # Retrieves the host external IP rather than localhost/127.0.0.1 so we have an IP that # Impala will consider distinct from storage backends to force remote scheduling. @@ -30,3 +32,49 @@ def get_external_ip(): # Timeout=0 means it doesn't need to resolve. s.connect(('10.254.254.254', 1)) return s.getsockname()[0] + + +def split_host_port(host_port): + """Checks if the host name also contains a port and separates the two. + Returns either (host, None) or (host, port). Detects if host is an ipv6 address + like "[::]" and removes the brackets from it. + """ + is_ipv6_address = host_port[0] == "[" + if is_ipv6_address: + parts = host_port[1:].split("]") + if len(parts) == 1 or not parts[1]: + return (parts[0], None) + return (parts[0], int(parts[1][1:])) + else: + parts = host_port.split(":") + if len(parts) == 1: + return (parts[0], None) + return (parts[0], int(parts[1])) + + +def to_host_port(host, port): + is_ipv6_address = ":" in host + fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}" + return fmt.format(host, port) + + +CERT_TO_CA_MAP = { + "wildcard-cert.pem": "wildcardCA.pem", + "wildcard-san-cert.pem": "wildcardCA.pem" +} + +REQUIRED_MIN_OPENSSL_VERSION = 0x10001000 +# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5 +# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already +if IS_REDHAT_DERIVATIVE: + REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5) +else: + REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9) +_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None) +if _openssl_version_number is None: + SKIP_SSL_MSG = "Legacy OpenSSL module detected" +elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION: + SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % ( + ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION) +else: + SKIP_SSL_MSG = None diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py index 832b547ae..09c462d97 100644 --- a/tests/custom_cluster/test_client_ssl.py +++ b/tests/custom_cluster/test_client_ssl.py @@ -24,34 +24,20 @@ import pytest import re import requests import signal -import ssl import socket import sys import time -from tests.common.environ import IS_REDHAT_DERIVATIVE + from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_service import ImpaladService +from tests.common.network import SKIP_SSL_MSG, REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 from tests.common.test_dimensions import create_client_protocol_dimension from tests.common.test_vector import BEESWAX from tests.shell.util import run_impala_shell_cmd, run_impala_shell_cmd_no_expect, \ ImpalaShell, create_impala_shell_executable_dimension -REQUIRED_MIN_OPENSSL_VERSION = 0x10001000 -# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5 -# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already -if IS_REDHAT_DERIVATIVE: - REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5) -else: - REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9) -_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None) -if _openssl_version_number is None: - SKIP_SSL_MSG = "Legacy OpenSSL module detected" -elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION: - SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % ( - ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION) -else: - SKIP_SSL_MSG = None + CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME'] @@ -99,7 +85,6 @@ class TestClientSsl(CustomClusterTestSuite): @CustomClusterTestSuite.with_args(impalad_args=SSL_ARGS, statestored_args=SSL_ARGS, catalogd_args=SSL_ARGS) def test_ssl(self, vector): - self._verify_negative_cases(vector) # TODO: This is really two different tests, but the custom cluster takes too long to # start. Make it so that custom clusters can be specified across test suites. @@ -140,6 +125,7 @@ class TestClientSsl(CustomClusterTestSuite): print(result.stderr) assert "Query Status: Cancelled" in result.stdout assert impalad.wait_for_num_in_flight_queries(0) + self.check_connections() WEBSERVER_SSL_ARGS = ("--webserver_certificate_file=%(cert_dir)s/server-cert.pem " "--webserver_private_key_file=%(cert_dir)s/server-key.pem " @@ -217,6 +203,7 @@ class TestClientSsl(CustomClusterTestSuite): self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR, host="ip4.impala.test") + self.check_connections() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args=SSL_WILDCARD_SAN_ARGS, @@ -225,7 +212,6 @@ class TestClientSsl(CustomClusterTestSuite): @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG) def test_wildcard_san_ssl(self, vector): """ Test for IMPALA-3159: Test with a certificate which has a wildcard as a SAN. """ - # This block of code is the same as _validate_positive_cases() but we want to check # if retrieving the SAN is supported first. args = ["--ssl", "-q", "select 1 + 2", "--ca_cert=%s/wildcardCA.pem" % CERT_DIR] @@ -239,6 +225,8 @@ class TestClientSsl(CustomClusterTestSuite): self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR, host="ip4.impala.test") + self.check_connections() + def _verify_negative_cases(self, vector, host=""): # Expect the shell to not start successfully if we point --ca_cert to an incorrect diff --git a/tests/custom_cluster/test_event_processing_error.py b/tests/custom_cluster/test_event_processing_error.py index 49d5e9b10..3942e2698 100644 --- a/tests/custom_cluster/test_event_processing_error.py +++ b/tests/custom_cluster/test_event_processing_error.py @@ -352,18 +352,18 @@ class TestEventProcessingError(CustomClusterTestSuite): replication tests """ # inserts on transactional tables - TestEventProcessingBase._run_test_insert_events_impl(unique_database, True) + TestEventProcessingBase._run_test_insert_events_impl(self, unique_database, True) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" try: test_db = unique_database + "_no_transact" self.run_stmt_in_hive("""create database {}""".format(test_db)) # inserts on external tables - TestEventProcessingBase._run_test_insert_events_impl(test_db, False) + TestEventProcessingBase._run_test_insert_events_impl(self, test_db, False) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" finally: self.run_stmt_in_hive("""drop database {} cascade""".format(test_db)) # replication related tests - TestEventProcessingBase._run_event_based_replication_tests_impl( + TestEventProcessingBase._run_event_based_replication_tests_impl(self, self.filesystem_client) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" diff --git a/tests/custom_cluster/test_ipv6.py b/tests/custom_cluster/test_ipv6.py new file mode 100644 index 000000000..f4a0a9de5 --- /dev/null +++ b/tests/custom_cluster/test_ipv6.py @@ -0,0 +1,251 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import, division, print_function +import json +import logging +import os +import pytest +import requests + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.network import SKIP_SSL_MSG +from tests.common.test_dimensions import create_client_protocol_dimension +from tests.shell.util import run_impala_shell_cmd, \ + create_impala_shell_executable_dimension +from tests.common.impala_connection import create_connection + +LOG = logging.getLogger('impala_test_suite') + +CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME'] + +# Use wildcard san cert to be flexible about host name. +SSL_WILDCARD_SAN_ARGS = ("--ssl_client_ca_certificate={0}/wildcardCA.pem " + "--ssl_server_certificate={0}/wildcard-san-cert.pem " + "--ssl_private_key={0}/wildcard-san-cert.key " + "--hostname={1} " + "--state_store_host={1} " + "--catalog_service_host={1} " + "--webserver_certificate_file={0}/wildcard-san-cert.pem " + "--webserver_private_key_file={0}/wildcard-san-cert.key " + ).format(CERT_DIR, "ip4.impala.test") + +WILDCARD_CA_CERT_PATH = "%s/wildcardCA.pem" % CERT_DIR + +IPV6_ONLY_IP_WEBSERBER_ARG = "--webserver_interface=::1 " +IPV6_DUAL_IP_WEBSERBER_ARG = "--webserver_interface=:: " +IPV6_ONLY_IP_QUERY_ARG = "--external_interface=::1 " +IPV6_DUAL_IP_QUERY_ARG = "--external_interface=:: " +IPV6_ONLY_IP_COORDINATOR_ARG = \ + "%s %s" % (IPV6_ONLY_IP_WEBSERBER_ARG, IPV6_ONLY_IP_QUERY_ARG) +IPV6_DUAL_IP_COORDINATOR_ARG = \ + "%s %s" % (IPV6_DUAL_IP_WEBSERBER_ARG, IPV6_DUAL_IP_QUERY_ARG) + +IPV6_ONLY_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip6.impala.test " +IPV6_DUAL_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip46.impala.test " +IPV6_ONLY_HOSTNAME_QUERY_ARG = "--external_interface=::1 " +IPV6_DUAL_HOSTNAME_QUERY_ARG = "--external_interface=:: " + +WEBUI_PORTS = [25000, 25010, 25020] + +# Error text can depend on both protocol and python version. +CONN_ERR = ["Could not connect", "Connection refused"] +CERT_ERR = ["doesn't match", "certificate verify failed"] +WEB_CERT_ERR = "CertificateError" + + +class TestIPv6Base(CustomClusterTestSuite): + ca_cert = None + + @classmethod + def setup_class(cls): + super(TestIPv6Base, cls).setup_class() + + @classmethod + def add_test_dimensions(cls): + super(TestIPv6Base, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension()) + cls.ImpalaTestMatrix.add_dimension(create_impala_shell_executable_dimension()) + + def _smoke(self, host, vector, expected_errors=[]): + proto = vector.get_value('protocol') + try: + port = self._get_default_port(proto) + host_port = "%s:%d" % (host, port) + use_ssl = self.ca_cert is not None + conn = create_connection(host_port, protocol=proto, use_ssl=use_ssl) + conn.connect() + assert not expected_errors + res = conn.execute("select 1") + assert res.data == ["1"] + except Exception as ex: + for err in expected_errors: + if err in str(ex): return + raise ex + + def _webui_smoke(self, url, err=None): + """Tests to check glibc version and locale is available""" + try: + if self.ca_cert: + other_info_page = requests.get(url + "/?json", verify=self.ca_cert).text + else: + other_info_page = requests.get(url + "/?json", verify=False).text + assert err is None + other_info = json.loads(other_info_page) + assert "glibc_version" in other_info + except Exception as ex: + if not err: raise ex + assert err in str(ex) + + def _shell_smoke(self, host, vector, expected_errors=[]): + proto = vector.get_value('protocol') + port = self._get_default_port(proto) + host_port = "%s:%d" % (host, port) + try: + shell_options = ["-i", host_port, "-q", "select 1"] + if self.ca_cert: + shell_options.append("--ssl") + shell_options.append("--ca_cert=" + self.ca_cert) + result = run_impala_shell_cmd(vector, shell_options) + assert not expected_errors + assert "Fetched 1 row" in result.stderr + except Exception as ex: + for err in expected_errors: + if err in str(ex): return + raise ex + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_DUAL_IP_WEBSERBER_ARG + + IPV6_DUAL_IP_QUERY_ARG, + statestored_args=IPV6_DUAL_IP_WEBSERBER_ARG, + catalogd_args=IPV6_DUAL_IP_WEBSERBER_ARG) +class TestIPv6DualNoSsl(TestIPv6Base): + + def test_ipv6_dual_no_ssl(self, vector): + for port in WEBUI_PORTS: + self._webui_smoke("http://127.0.0.1:%d" % port) + self._webui_smoke("http://[::1]:%d" % port) + self._webui_smoke("http://ip4.impala.test:%d" % port) + self._webui_smoke("http://ip6.impala.test:%d" % port) + self._webui_smoke("http://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector) + self._smoke("127.0.0.1", vector) + self._smoke("ip4.impala.test", vector) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector) + self._shell_smoke("127.0.0.1", vector) + self._shell_smoke("ip4.impala.test", vector) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_ONLY_IP_WEBSERBER_ARG + + IPV6_ONLY_IP_QUERY_ARG, + statestored_args=IPV6_ONLY_IP_WEBSERBER_ARG, + catalogd_args=IPV6_ONLY_IP_WEBSERBER_ARG) +class TestIPv6OnlyNoSsl(TestIPv6Base): + + def test_ipv6_only_no_ssl(self, vector): + self.check_connections() + for port in WEBUI_PORTS: + self._webui_smoke("http://127.0.0.1:%d" % port, err="Connection refused") + self._webui_smoke("http://[::1]:%d" % port) + self._webui_smoke("http://ip4.impala.test:%d" % port, err="Connection refused") + self._webui_smoke("http://ip6.impala.test:%d" % port) + self._webui_smoke("http://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector) + self._smoke("127.0.0.1", vector, CONN_ERR) + self._smoke("ip4.impala.test", vector, CONN_ERR) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector) + self._shell_smoke("127.0.0.1", vector, CONN_ERR) + self._shell_smoke("ip4.impala.test", vector, CONN_ERR) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG + + IPV6_DUAL_HOSTNAME_QUERY_ARG + + SSL_WILDCARD_SAN_ARGS, + statestored_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS, + catalogd_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS) +class TestIPv6DualSsl(TestIPv6Base): + ca_cert = WILDCARD_CA_CERT_PATH + + @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG) + def test_ipv6_dual_ssl(self, vector): + self.check_connections() + for port in WEBUI_PORTS: + self._webui_smoke("https://127.0.0.1:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://[::1]:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://ip4.impala.test:%d" % port) + self._webui_smoke("https://ip6.impala.test:%d" % port) + self._webui_smoke("https://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector, CONN_ERR) + self._smoke("127.0.0.1", vector, CONN_ERR) + self._smoke("ip4.impala.test", vector) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector, CERT_ERR) + self._shell_smoke("127.0.0.1", vector, CERT_ERR) + self._shell_smoke("ip4.impala.test", vector) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG + + IPV6_ONLY_HOSTNAME_QUERY_ARG + + SSL_WILDCARD_SAN_ARGS, + statestored_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS, + catalogd_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS) +class TestIPv6OnlySsl(TestIPv6Base): + ca_cert = WILDCARD_CA_CERT_PATH + + @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG) + def test_ipv6_only_ssl(self, vector): + self.check_connections() + for port in WEBUI_PORTS: + self._webui_smoke("https://127.0.0.1:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://[::1]:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://ip4.impala.test:%d" % port, "Connection refused") + self._webui_smoke("https://ip6.impala.test:%d" % port) + self._webui_smoke("https://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector, CONN_ERR) + self._smoke("127.0.0.1", vector, CONN_ERR) + self._smoke("ip4.impala.test", vector, CONN_ERR) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector, CERT_ERR) + self._shell_smoke("127.0.0.1", vector, CONN_ERR) + self._shell_smoke("ip4.impala.test", vector, CONN_ERR) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) diff --git a/tests/custom_cluster/test_redaction.py b/tests/custom_cluster/test_redaction.py index 7e4b2aad4..0ca333206 100644 --- a/tests/custom_cluster/test_redaction.py +++ b/tests/custom_cluster/test_redaction.py @@ -39,6 +39,10 @@ class TestRedaction(CustomClusterTestSuite): limited to table data and query text since queries may refer to table data. ''' + @classmethod + def setup_class(cls): + super(TestRedaction, cls).setup_class() + @property def log_dir(self): return os.path.join(self.tmp_dir, "logs") diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py index 1b05c146a..767833304 100644 --- a/tests/metadata/test_event_processing.py +++ b/tests/metadata/test_event_processing.py @@ -35,10 +35,14 @@ PROCESSING_TIMEOUT_S = 10 LOG = logging.getLogger(__name__) @SkipIfFS.hive -class TestEventProcessing(ImpalaTestSuite): +class TestEventProcessing(TestEventProcessingBase): """This class contains tests that exercise the event processing mechanism in the catalog.""" + @classmethod + def setup_class(cls): + super(TestEventProcessing, cls).setup_class() + @classmethod def default_test_protocol(cls): return HS2 @@ -47,13 +51,13 @@ class TestEventProcessing(ImpalaTestSuite): def test_transactional_insert_events(self, unique_database): """Executes 'run_test_insert_events' for transactional tables. """ - TestEventProcessingBase._run_test_insert_events_impl( + TestEventProcessingBase._run_test_insert_events_impl(self, unique_database, is_transactional=True) def test_insert_events(self, unique_database): """Executes 'run_test_insert_events' for non-transactional tables. """ - TestEventProcessingBase._run_test_insert_events_impl(unique_database) + TestEventProcessingBase._run_test_insert_events_impl(self, unique_database) def test_iceberg_inserts(self): """IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation @@ -99,13 +103,12 @@ class TestEventProcessing(ImpalaTestSuite): self._run_test_empty_partition_events(unique_database, False) def test_event_based_replication(self): - TestEventProcessingBase._run_event_based_replication_tests_impl( + self._run_event_based_replication_tests_impl(self, self.filesystem_client) def _run_test_empty_partition_events(self, unique_database, is_transactional): test_tbl = unique_database + ".test_events" - TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties( - is_transactional) + TBLPROPERTIES = self._get_transactional_tblproperties(is_transactional) self.run_stmt_in_hive("create table {0} (key string, value string) \ partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES)) self.client.set_configuration({ diff --git a/tests/metadata/test_event_processing_base.py b/tests/metadata/test_event_processing_base.py index 97725a2c0..baafd5358 100644 --- a/tests/metadata/test_event_processing_base.py +++ b/tests/metadata/test_event_processing_base.py @@ -27,17 +27,22 @@ EVENT_SYNC_QUERY_OPTIONS = { class TestEventProcessingBase(ImpalaTestSuite): @classmethod - def _run_test_insert_events_impl(cls, unique_database, is_transactional=False): + def setup_class(cls): + super(TestEventProcessingBase, cls).setup_class() + + @classmethod + def _run_test_insert_events_impl(cls, suite, unique_database, is_transactional=False): """Test for insert event processing. Events are created in Hive and processed in Impala. The following cases are tested : Insert into table --> for partitioned and non-partitioned table Insert overwrite table --> for partitioned and non-partitioned table Insert into partition --> for partitioned table """ - with cls.create_impala_client() as impala_client: + # TODO: change into an instance method and remove argument "suite" (IMPALA-14174) + with suite.create_impala_client() as impala_client: # Test table with no partitions. tbl_insert_nopart = 'tbl_insert_nopart' - cls.run_stmt_in_hive( + suite.run_stmt_in_hive( "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart)) tblproperties = "" if is_transactional: @@ -141,16 +146,18 @@ class TestEventProcessingBase(ImpalaTestSuite): assert len(result.data) == 0 @classmethod - def _run_event_based_replication_tests_impl(cls, filesystem_client, transactional=True): + def _run_event_based_replication_tests_impl(cls, suite, + filesystem_client, transactional=True): """Hive Replication relies on the insert events generated on the tables. This test issues some basic replication commands from Hive and makes sure that the replicated table has correct data.""" + # TODO: change into an instance method and remove argument "suite" (IMPALA-14174) TBLPROPERTIES = cls._get_transactional_tblproperties(transactional) source_db = ImpalaTestSuite.get_random_name("repl_source_") target_db = ImpalaTestSuite.get_random_name("repl_target_") unpartitioned_tbl = "unpart_tbl" partitioned_tbl = "part_tbl" - impala_client = cls.create_impala_client() + impala_client = suite.create_impala_client() try: cls.run_stmt_in_hive("create database {0}".format(source_db)) cls.run_stmt_in_hive( diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py index 5809cae06..93f9d26a8 100644 --- a/tests/stress/test_acid_stress.py +++ b/tests/stress/test_acid_stress.py @@ -93,7 +93,7 @@ class TestAcidInsertsBasic(TestAcidStress): def _impala_role_write_inserts(self, tbl_name, partitioned): """INSERT INTO/OVERWRITE a table several times from Impala.""" try: - impalad_client = ImpalaTestSuite.create_impala_client() + impalad_client = self.create_impala_client() part_expr = "partition (p=1)" if partitioned else "" for run in range(0, NUM_OVERWRITES + 1): OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) @@ -109,7 +109,7 @@ class TestAcidInsertsBasic(TestAcidStress): def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds): """SELECT from a table many times until the expected final values are found.""" try: - impalad_client = ImpalaTestSuite.create_impala_client() + impalad_client = self.create_impala_client() expected_result = {"run": -1, "i": 0} accept_empty_table = True while expected_result["run"] != NUM_OVERWRITES and \ @@ -182,7 +182,7 @@ class TestAcidInsertsBasic(TestAcidStress): def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, sleep_sec): insert_op = "OVERWRITE" if is_overwrite else "INTO" try: - impalad_client = ImpalaTestSuite.create_impala_client() + impalad_client = self.create_impala_client() impalad_client.execute( """insert {op} table {tbl_name} partition({partition}) select sleep({sleep_ms})""".format(op=insert_op, tbl_name=tbl_name, diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py index 5653cc9eb..235c92416 100644 --- a/tests/stress/test_insert_stress.py +++ b/tests/stress/test_insert_stress.py @@ -36,6 +36,10 @@ class TestInsertStress(ImpalaTestSuite): def get_workload(self): return 'targeted-stress' + @classmethod + def setup_class(cls): + super(TestInsertStress, cls).setup_class() + @classmethod def add_test_dimensions(cls): super(TestInsertStress, cls).add_test_dimensions() @@ -46,8 +50,8 @@ class TestInsertStress(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, wid, num_inserts, counter): """Writes ascending numbers up to 'num_inserts' into column 'i'. To column 'wid' it writes its identifier passed in parameter 'wid'.""" - target_impalad = wid % ImpalaTestSuite.get_impalad_cluster_size() - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = wid % self.get_impalad_cluster_size() + impalad_client = self.create_client_for_nth_impalad(target_impalad) try: insert_cnt = 0 while insert_cnt < num_inserts: @@ -72,8 +76,8 @@ class TestInsertStress(ImpalaTestSuite): assert sorted_run == list(range(sorted_run[0], sorted_run[-1] + 1)), \ "wid: %d" % wid - target_impalad = cid % ImpalaTestSuite.get_impalad_cluster_size() - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = cid % self.get_impalad_cluster_size() + impalad_client = self.create_client_for_nth_impalad(target_impalad) try: while counter.value != writers: result = impalad_client.execute("select * from %s" % tbl_name) diff --git a/tests/stress/test_merge_stress.py b/tests/stress/test_merge_stress.py index e948aa650..7bd34cb23 100644 --- a/tests/stress/test_merge_stress.py +++ b/tests/stress/test_merge_stress.py @@ -41,8 +41,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite): def _impala_role_concurrent_updater(self, tbl_name, col, num_writes): """Increments values in column 'total' and in the column which is passed in 'col'.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) merge_stmt = """merge into {0} target using {0} source on source.total = target.total when matched then update set @@ -61,8 +61,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, num_inserts): """Adds a new row based on the maximum 'total' value.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) merge_stmt = """merge into {0} target using (select total, a, b, c from {0} order by total desc limit 1) source on source.total +1 = target.total @@ -92,8 +92,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite): assert total == a + b + c return max_total - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) total = 0 while total < target_total: result = impalad_client.execute("select * from %s" % tbl_name) diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py index 06579f7b8..5d76451af 100644 --- a/tests/stress/test_update_stress.py +++ b/tests/stress/test_update_stress.py @@ -82,8 +82,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, col, num_updates): """Increments values in column 'total' and in the column which is passed in 'col'.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) update_cnt = 0 while update_cnt < num_updates: try: @@ -107,8 +107,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite): assert total == a + b + c return total - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) total = 0 while total < target_total: result = impalad_client.execute("select * from %s" % tbl_name) @@ -163,8 +163,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows): """Deletes every row from the table one by one.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) impalad_client.set_configuration_option("SYNC_DDL", "true") i = 0 while i < num_rows: @@ -181,8 +181,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted): """Updates every row in the table in a loop.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) impalad_client.set_configuration_option("SYNC_DDL", "true") while all_rows_deleted.value != 1: try: @@ -196,8 +196,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted): """Optimizes the table in a loop.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) impalad_client.set_configuration_option("SYNC_DDL", "true") while all_rows_deleted.value != 1: try: @@ -253,8 +253,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): prev_j = j assert prev_id == num_rows - 1 - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) while all_rows_deleted.value != 1: result = impalad_client.execute("select * from %s order by id" % tbl_name) verify_result_set(result)