From 5cca1aa9e5e1133640fb0e75630c719ebaa63ac1 Mon Sep 17 00:00:00 2001 From: Csaba Ringhofer Date: Sun, 23 Feb 2025 23:53:48 +0100 Subject: [PATCH] IMPALA-13820: add ipv6 support for webui/hs2/hs2-http/beeswax Main changes: - added flag external_interface to override hostname for beeswax/hs2/hs2-http port to allow testing ipv6 on these interfaces without forcing ipv6 on internal communication - compile Squeasel with USE_IPV6 to allow ipv6 on webui (webui interface can be configured with existing flag webserver_interface) - fixed the handling of []. style addresses in impala-shell (e.g. [::1]:21050) and test framework - improved handling of custom clusters in test framework to allow webui/ImpalaTestSuite's clients to work with non standard settings (also fixes these clients with SSL) Using ipv4 vs ipv6 vs dual stack can be configured by setting the interface to bind to with flag webserver_interface and external_interface. The Thrift server behind hs2/hs2-http/beeswax only accepts a single host name and uses the first address returned by getaddrinfo() that it can successfully bind to. This means that unless an ipv6 address is used (like ::1) the behavior will depend on the order of addresses returned by getaddrinfo(): https://github.com/apache/thrift/blob/63b7a263fc669c56fedca5d9a7310902d98df335/lib/cpp/src/thrift/transport/TServerSocket.cpp#L481 For dual stack the only way currently is to bind to "::", as the Thrift server can only listen a single socket. Testing: - added custom cluster tests for ipv6 only/dual interface with and without SSL - manually tested in dual stack environment with client on a different host - among clients impala-shell and impyla are tested, but not JDBC/ODBC - no tests yet on truly ipv6 only environment, as internal communication (e.g. krpc) is not ready for ipv6 To test manually the dev cluster can be started with ipv6 support: dual mode: bin/start-impala-cluster.py --impalad_args="--external_interface=:: --webserver_interface=::" --catalogd_args="--webserver_interface=::" --state_store_args="--webserver_interface=::" ipv6 only: bin/start-impala-cluster.py --impalad_args="--external_interface=::1 --webserver_interface=::1" --catalogd_args="--webserver_interface=::1" --state_store_args="--webserver_interface=::1" Change-Id: I51ac66c568cc9bb06f4a3915db07a53c100109b6 Reviewed-on: http://gerrit.cloudera.org:8080/22527 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- be/src/rpc/thrift-server.cc | 7 +- be/src/rpc/thrift-server.h | 13 +- be/src/service/impala-server.cc | 8 + be/src/util/CMakeLists.txt | 2 + be/src/util/network-util.cc | 40 ++- be/src/util/network-util.h | 8 +- be/src/util/webserver.cc | 24 +- shell/impala_shell/impala_client.py | 10 +- shell/impala_shell/impala_shell.py | 31 ++- tests/beeswax/impala_beeswax.py | 13 +- tests/common/custom_cluster_test_suite.py | 7 + tests/common/impala_cluster.py | 17 +- tests/common/impala_connection.py | 5 +- tests/common/impala_service.py | 75 +++--- tests/common/impala_test_suite.py | 66 +++-- tests/common/network.py | 48 ++++ tests/custom_cluster/test_client_ssl.py | 26 +- .../test_event_processing_error.py | 6 +- tests/custom_cluster/test_ipv6.py | 251 ++++++++++++++++++ tests/custom_cluster/test_redaction.py | 4 + tests/metadata/test_event_processing.py | 15 +- tests/metadata/test_event_processing_base.py | 17 +- tests/stress/test_acid_stress.py | 6 +- tests/stress/test_insert_stress.py | 12 +- tests/stress/test_merge_stress.py | 12 +- tests/stress/test_update_stress.py | 24 +- 26 files changed, 594 insertions(+), 153 deletions(-) create mode 100644 tests/custom_cluster/test_ipv6.py diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc index 181741664..f8cab8e48 100644 --- a/be/src/rpc/thrift-server.cc +++ b/be/src/rpc/thrift-server.cc @@ -270,9 +270,10 @@ ThriftServer::ThriftServer(const string& name, const std::shared_ptr& processor, int port, AuthProvider* auth_provider, MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms, int64_t idle_poll_period_ms, TransportType transport_type, - bool is_external_facing) + bool is_external_facing, string host) : started_(false), port_(port), + host_(std::move(host)), ssl_enabled_(false), max_concurrent_connections_(max_concurrent_connections), queue_timeout_ms_(queue_timeout_ms), @@ -342,7 +343,7 @@ Status ThriftServer::CreateSocket(std::shared_ptr* socket) { socket_factory->loadCertificate(certificate_path_.c_str()); socket_factory->loadPrivateKey(private_key_path_.c_str()); ImpalaKeepAliveServerSocket* server_socket = - new ImpalaKeepAliveServerSocket(port_, socket_factory); + new ImpalaKeepAliveServerSocket(host_, port_, socket_factory); server_socket->setKeepAliveOptions(keepalive_probe_period_s_, keepalive_retry_period_s_, keepalive_retry_count_); socket->reset(server_socket); @@ -351,7 +352,7 @@ Status ThriftServer::CreateSocket(std::shared_ptr* socket) { } } else { ImpalaKeepAliveServerSocket* server_socket = - new ImpalaKeepAliveServerSocket(port_); + new ImpalaKeepAliveServerSocket(host_, port_); server_socket->setKeepAliveOptions(keepalive_probe_period_s_, keepalive_retry_period_s_, keepalive_retry_count_); socket->reset(server_socket); diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h index 820526ca1..a6fe327a7 100644 --- a/be/src/rpc/thrift-server.h +++ b/be/src/rpc/thrift-server.h @@ -309,7 +309,7 @@ class ThriftServer { int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0, int64_t idle_poll_period_ms = 0, TransportType server_transport = TransportType::BINARY, - bool is_external_facing = true); + bool is_external_facing = true, std::string host = ""); /// Enables secure access over SSL. Must be called before Start(). The first three /// arguments are the minimum SSL/TLS version, and paths to certificate and private key @@ -345,6 +345,9 @@ class ThriftServer { /// replaced with whatever port number the server is listening on. int port_; + /// The host name to bind with. + string host_; + /// True if the server socket only accepts SSL connections bool ssl_enabled_; @@ -546,6 +549,11 @@ class ThriftServerBuilder { return *this; } + ThriftServerBuilder& host(const string& host) { + host_ = host; + return *this; + } + /// Constructs a new ThriftServer and puts it in 'server', if construction was /// successful, returns an error otherwise. In the error case, 'server' will not have /// been set and will not need to be freed, otherwise the caller assumes ownership of @@ -554,7 +562,7 @@ class ThriftServerBuilder { std::unique_ptr ptr( new ThriftServer(name_, processor_, port_, auth_provider_, metrics_, max_concurrent_connections_, queue_timeout_ms_, idle_poll_period_ms_, - server_transport_type_, is_external_facing_)); + server_transport_type_, is_external_facing_, host_)); if (enable_ssl_) { RETURN_IF_ERROR(ptr->EnableSsl( version_, certificate_, private_key_, pem_password_cmd_, cipher_list_, @@ -572,6 +580,7 @@ class ThriftServerBuilder { int max_concurrent_connections_ = 0; std::string name_; std::shared_ptr processor_; + std::string host_; int port_ = 0; ThriftServer::TransportType server_transport_type_ = ThriftServer::TransportType::BINARY; diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index a8bfd5ace..9584405ad 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -157,6 +157,9 @@ DEFINE_bool(enable_external_fe_http, false, "if true enables http transport for external_fe_port otherwise binary transport is " "used"); +DEFINE_string(external_interface, "", + "Host name to bind with in beeswax/hs2/hs2-http. \"::\" allows IPv6 (dual stack)."); + DEFINE_int32(fe_service_threads, 64, "number of threads available to serve client requests"); DEFINE_string(default_query_options, "", "key=value pair of default query options for" @@ -3238,6 +3241,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, .keepalive(FLAGS_client_keepalive_probe_period_s, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) + .host(FLAGS_external_interface) .Build(&server)); beeswax_server_.reset(server); beeswax_server_->SetConnectionHandler(this); @@ -3270,6 +3274,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, .keepalive(FLAGS_client_keepalive_probe_period_s, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) + .host(FLAGS_external_interface) .Build(&server)); hs2_server_.reset(server); hs2_server_->SetConnectionHandler(this); @@ -3305,6 +3310,8 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) .Build(&server)); + // FLAGS_external_interface is not passed to external external_fe_port. If this is + // needed (e.g. for dual stack) then another subtask can be added to IMPALA-13819. external_fe_server_.reset(server); external_fe_server_->SetConnectionHandler(this); } @@ -3338,6 +3345,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port, .keepalive(FLAGS_client_keepalive_probe_period_s, FLAGS_client_keepalive_retry_period_s, FLAGS_client_keepalive_retry_count) + .host(FLAGS_external_interface) .Build(&http_server)); hs2_http_server_.reset(http_server); hs2_http_server_->SetConnectionHandler(this); diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 4468f3ba7..e254620b4 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -163,6 +163,8 @@ add_dependencies(Util gen-deps) # Squeasel requires C99 compatibility to build. SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c PROPERTIES COMPILE_FLAGS -std=c99) +SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c + PROPERTIES COMPILE_FLAGS -DUSE_IPV6) # shared library which provides native logging support to JVMs over JNI. add_library(loggingsupport SHARED diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index b59cd408b..61b499eab 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -53,6 +53,7 @@ using std::random_device; namespace impala { const string LOCALHOST_IP_STR("127.0.0.1"); +const string LOCALHOST_IP_V6_STR("::1"); Status GetHostname(string* hostname) { char name[HOST_NAME_MAX]; @@ -67,33 +68,44 @@ Status GetHostname(string* hostname) { return Status::OK(); } -Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){ +Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip, bool ipv6){ // Try to resolve via the operating system. vector addresses; addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_INET; // IPv4 addresses only + hints.ai_family = ipv6 ? AF_INET6 : AF_INET; hints.ai_socktype = SOCK_STREAM; struct addrinfo* addr_info; if (getaddrinfo(hostname.c_str(), NULL, &hints, &addr_info) != 0) { stringstream ss; - ss << "Could not find IPv4 address for: " << hostname; + ss << "Could not find IPv" << (ipv6 ? 6 : 4) << " address for: " << hostname; return Status(ss.str()); } addrinfo* it = addr_info; while (it != NULL) { - char addr_buf[64]; - const char* result = - inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, addr_buf, 64); + char addr_buf[INET6_ADDRSTRLEN]; + const char* result = nullptr; + bool is_ipv6_addr = it->ai_family == AF_INET6; + if (is_ipv6_addr) { + result = inet_ntop(AF_INET6, &((sockaddr_in6*)it->ai_addr)->sin6_addr, + addr_buf, sizeof(addr_buf)); + } else { + result = inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, + addr_buf, sizeof(addr_buf)); + } + if (result == NULL) { stringstream ss; - ss << "Could not convert IPv4 address for: " << hostname; + ss << "Could not convert IPv" << (is_ipv6_addr ? 6: 4) + << "address for: " << hostname; freeaddrinfo(addr_info); return Status(ss.str()); } - addresses.push_back(string(addr_buf)); + if (is_ipv6_addr == ipv6) { + addresses.push_back(string(addr_buf)); + } it = it->ai_next; } @@ -101,7 +113,7 @@ Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){ if (addresses.empty()) { stringstream ss; - ss << "Could not convert IPv4 address for: " << hostname; + ss << "Could not convert IPv" << (ipv6 ? 6 : 4) << " address for: " << hostname; return Status(ss.str()); } @@ -130,7 +142,7 @@ bool IsResolvedAddress(const NetworkAddressPB& addr) { bool FindFirstNonLocalhost(const vector& addresses, string* addr) { for (const string& candidate: addresses) { - if (candidate != LOCALHOST_IP_STR) { + if (candidate != LOCALHOST_IP_STR && candidate != LOCALHOST_IP_V6_STR) { *addr = candidate; return true; } @@ -226,7 +238,13 @@ bool IsWildcardAddress(const string& ipaddress) { string TNetworkAddressToString(const TNetworkAddress& address) { stringstream ss; - ss << address.hostname << ":" << dec << address.port; + if (address.hostname.find(':') == string::npos) { + // IPv4 + ss << address.hostname << ":" << dec << address.port; + } else { + // IPv6 + ss << "[" << address.hostname << "]:" << dec << address.port; + } return ss.str(); } diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index 0facbb191..88071cc6a 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -42,9 +42,11 @@ bool IsResolvedAddress(const NetworkAddressPB& addr); /// Looks up all IP addresses associated with a given hostname and returns one of them via /// 'address'. If the IP addresses of a host don't change, then subsequent calls will -/// always return the same address. Returns an error status if any system call failed, -/// otherwise OK. Even if OK is returned, addresses may still be of zero length. -Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) WARN_UNUSED_RESULT; +/// always return the same address. Returns an error status if any system call failed or +/// no address was found, otherwise OK. +/// Returns only ipv6 addresses if 'ipv6' is true, otherwise only ipv4 addresses. +Status HostnameToIpAddr( + const Hostname& hostname, IpAddr* ip, bool ipv6=false) WARN_UNUSED_RESULT; /// Finds the first non-localhost IP address in the given list. Returns /// true if such an address was found, false otherwise. diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc index 2d8ac1ac5..be7e3300d 100644 --- a/be/src/util/webserver.cc +++ b/be/src/util/webserver.cc @@ -408,17 +408,25 @@ bool Webserver::IsSecure() const { Status Webserver::Start() { LOG(INFO) << "Starting webserver on " << TNetworkAddressToString(http_address_); - IpAddr ip; - RETURN_IF_ERROR(HostnameToIpAddr(http_address_.hostname, &ip)); - stringstream listening_spec; - listening_spec << ip << ":" << http_address_.port; + IpAddr ipv4, ipv6; + Status ip_v6_status = HostnameToIpAddr(http_address_.hostname, &ipv6, true); + Status ip_v4_status = HostnameToIpAddr(http_address_.hostname, &ipv4, false); + if (!ip_v4_status.ok() && !ip_v6_status.ok()) return ip_v6_status; - if (IsSecure()) { - LOG(INFO) << "Webserver: Enabling HTTPS support"; - // Squeasel makes sockets with 's' suffixes accept SSL traffic only - listening_spec << "s"; + if (IsSecure()) LOG(INFO) << "Webserver: Enabling HTTPS support"; + stringstream listening_spec; + if (ip_v6_status.ok()) { + listening_spec << "[" << ipv6 << "]:" << http_address_.port; + if (IsSecure()) listening_spec << "s"; + } + if (ip_v4_status.ok()) { + if (ip_v6_status.ok()) listening_spec << ","; + listening_spec << ipv4 << ":" << http_address_.port; + if (IsSecure()) listening_spec << "s"; } string listening_str = listening_spec.str(); + LOG(INFO) << "Starting webserver listening to " << listening_str; + vector options; if (!FLAGS_webserver_doc_root.empty() && FLAGS_enable_webserver_doc_root) { diff --git a/shell/impala_shell/impala_client.py b/shell/impala_shell/impala_client.py index 8e6164746..7fae660bd 100755 --- a/shell/impala_shell/impala_client.py +++ b/shell/impala_shell/impala_client.py @@ -429,8 +429,7 @@ class ImpalaClient(object): # symptoms in case of a problematic remote endpoint. It's better to have a finite # timeout so that in case of any connection errors, the client retries have a better # chance of succeeding. - - host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port) + host_and_port = self._to_host_port(self.impalad_host, self.impalad_port) assert self.http_path # ImpalaHttpClient relies on the URI scheme (http vs https) to open an appropriate # connection to the server. @@ -582,6 +581,13 @@ class ImpalaClient(object): num_deleted_rows = sum([int(k) for k in dml_result.rows_deleted.values()]) return (num_rows, num_deleted_rows, dml_result.num_row_errors) + @staticmethod + def _to_host_port(host, port): + # Wrap ipv6 addresses in brackets. + is_ipv6_address = ":" in host + fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}" + return fmt.format(host, port) + class ImpalaHS2Client(ImpalaClient): """Impala client. Uses the HS2 protocol plus Impala-specific extensions.""" diff --git a/shell/impala_shell/impala_shell.py b/shell/impala_shell/impala_shell.py index 6de925ccb..2445e8629 100755 --- a/shell/impala_shell/impala_shell.py +++ b/shell/impala_shell/impala_shell.py @@ -1001,6 +1001,31 @@ class ImpalaShell(cmd.Cmd, object): """Exit the impala shell""" return self.do_quit(args) + @staticmethod + def __parse_host_port(addr): + """Checks if the host name also contains a port and separates the two. + Returns either [host] or [host, port]. Detects if host is an ipv6 address like "[::]" + and removes the brackets from it. + """ + split_by_colon = addr.split(':') + ipv6addr = len(split_by_colon) > 2 + host_port = None + if ipv6addr: + if addr[0] == "[": + parts = addr.split("]") + host_port = [parts[0][1:]] + has_port = parts[1] != "" + if has_port: + if parts[1][0] != ":": return None + host_port.append(parts[1][1:]) + else: + host_port = [addr] + else: + host_port = [val for val in split_by_colon if val.strip()] + # validate the connection string. + if ':' in addr and len(host_port) != 2: return None + return host_port + def do_connect(self, args): """Connect to an Impalad instance: Usage: connect, defaults to the fqdn of the localhost and the protocol's default port @@ -1023,10 +1048,10 @@ class ImpalaShell(cmd.Cmd, object): if not args: args = socket.getfqdn() tokens = args.split(" ") - # validate the connection string. - host_port = [val for val in tokens[0].split(':') if val.strip()] + addr = tokens[0] + host_port = self.__parse_host_port(addr) protocol = options.protocol.lower() - if (':' in tokens[0] and len(host_port) != 2): + if not host_port: print("Connection string must either be empty, or of the form " "", file=sys.stderr) return CmdStatus.ERROR diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index ac0eba84c..0ccc78159 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -35,6 +35,7 @@ import sys import time from builtins import filter, map + from thrift.protocol import TBinaryProtocol from thrift.Thrift import TApplicationException from thrift.transport.TTransport import TTransportException @@ -42,6 +43,7 @@ from thrift.transport.TTransport import TTransportException from impala_thrift_gen.beeswax import BeeswaxService from impala_thrift_gen.beeswax.BeeswaxService import QueryState from impala_thrift_gen.ImpalaService import ImpalaService +from tests.common.network import split_host_port from tests.util.thrift_util import create_transport LOG = logging.getLogger('impala_beeswax') @@ -115,12 +117,9 @@ class ImpalaBeeswaxClient(object): def __init__(self, impalad, use_kerberos=False, user=None, password=None, use_ssl=False): self.connected = False - split_impalad = impalad.split(":") - assert len(split_impalad) in [1, 2] - self.impalad_host = split_impalad[0] - self.impalad_port = 21000 # Default beeswax port - if len(split_impalad) == 2: - self.impalad_port = int(split_impalad[1]) + host, port = split_host_port(impalad) + self.impalad_host = host + self.impalad_port = port if port else 21000 # Default beeswax port self.imp_service = None self.transport = None self.use_kerberos = use_kerberos @@ -170,7 +169,7 @@ class ImpalaBeeswaxClient(object): def close_connection(self): """Close the transport if it's still open""" - if self.transport: + if self.transport and self.connected: self.transport.close() self.connected = False diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py index f39072050..5cf512c70 100644 --- a/tests/common/custom_cluster_test_suite.py +++ b/tests/common/custom_cluster_test_suite.py @@ -614,6 +614,7 @@ class CustomClusterTestSuite(ImpalaTestSuite): # Failure tests expect cluster to be initialised even if start-impala-cluster fails. cls.cluster = ImpalaCluster.get_e2e_test_cluster() + cls.impalad_test_service = cls.create_impala_service() PREVIOUS_CMD_STR = cmd_str @@ -650,3 +651,9 @@ class CustomClusterTestSuite(ImpalaTestSuite): for impalad in cls.cluster.impalads: impalad.service.wait_for_num_known_live_backends(expected_num_impalads, timeout=impalad_timeout_s) + + @classmethod + def create_impala_service(cls): + """Override ImpalaTestSuite to return 1st impalad of custom cluster. + Returns None if no impalad was started.""" + return cls.cluster.impalads[0].service if cls.cluster.impalads else None diff --git a/tests/common/impala_cluster.py b/tests/common/impala_cluster.py index c6c44bc76..96c436735 100644 --- a/tests/common/impala_cluster.py +++ b/tests/common/impala_cluster.py @@ -570,14 +570,21 @@ class BaseImpalaProcess(Process): def _get_webserver_certificate_file(self): # TODO: if this is containerised, the path will likely not be the same on the host. + # TODO: what we need in the client is the CA, not the server cert return self._get_arg_value("webserver_certificate_file", "") + def _get_ssl_client_ca_certificate(self): + return self._get_arg_value("ssl_client_ca_certificate", "") + def _get_hostname(self): return self._get_arg_value("hostname", socket.gethostname()) def _get_webserver_interface(self): return self._get_arg_value("webserver_interface", socket.gethostname()) + def _get_external_interface(self): + return self._get_arg_value("external_interface", socket.gethostname()) + def _get_arg_value(self, arg_name, default=None): """Gets the argument value for given argument name""" for arg in self.cmd: @@ -600,10 +607,12 @@ class BaseImpalaProcess(Process): class ImpaladProcess(BaseImpalaProcess): def __init__(self, cmd, container_id=None, port_map=None): super(ImpaladProcess, self).__init__(cmd, container_id, port_map) + self.external_interface = self._get_external_interface() self.service = ImpaladService(self.hostname, self.webserver_interface, + self.external_interface, self.get_webserver_port(), self.__get_beeswax_port(), self.__get_krpc_port(), self.__get_hs2_port(), self.__get_hs2_http_port(), - self._get_webserver_certificate_file()) + self._get_webserver_certificate_file(), self._get_ssl_client_ca_certificate()) def _get_default_webserver_port(self): return DEFAULT_IMPALAD_WEBSERVER_PORT @@ -685,7 +694,7 @@ class StateStoreProcess(BaseImpalaProcess): super(StateStoreProcess, self).__init__(cmd, container_id, port_map) self.service = StateStoredService(self.hostname, self.webserver_interface, self.get_webserver_port(), self._get_webserver_certificate_file(), - self.__get_port()) + self._get_ssl_client_ca_certificate(), self.__get_port()) def _get_default_webserver_port(self): return DEFAULT_STATESTORED_WEBSERVER_PORT @@ -712,6 +721,7 @@ class CatalogdProcess(BaseImpalaProcess): super(CatalogdProcess, self).__init__(cmd, container_id, port_map) self.service = CatalogdService(self.hostname, self.webserver_interface, self.get_webserver_port(), self._get_webserver_certificate_file(), + self._get_ssl_client_ca_certificate(), self.__get_port()) def _get_default_webserver_port(self): @@ -743,7 +753,8 @@ class AdmissiondProcess(BaseImpalaProcess): def __init__(self, cmd, container_id=None, port_map=None): super(AdmissiondProcess, self).__init__(cmd, container_id, port_map) self.service = AdmissiondService(self.hostname, self.webserver_interface, - self.get_webserver_port(), self._get_webserver_certificate_file()) + self.get_webserver_port(), self._get_webserver_certificate_file(), + self._get_ssl_client_ca_certificate()) def _get_default_webserver_port(self): return DEFAULT_ADMISSIOND_WEBSERVER_PORT diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py index f57c1f3a3..9aad86e66 100644 --- a/tests/common/impala_connection.py +++ b/tests/common/impala_connection.py @@ -40,6 +40,7 @@ from tests.beeswax.impala_beeswax import ( ImpalaBeeswaxException, ) import tests.common +from tests.common.network import split_host_port from tests.common.patterns import LOG_FORMAT from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP from tests.util.thrift_util import op_handle_to_query_id, session_handle_to_session_id @@ -610,13 +611,13 @@ class ImpylaHS2Connection(ImpalaConnection): return self.__cursor def connect(self): - host, port = self.__host_port.split(":") + host, port = split_host_port(self.__host_port) conn_kwargs = {} if self._is_hive: conn_kwargs['auth_mechanism'] = 'PLAIN' try: self.__impyla_conn = impyla.connect( - host=host, port=int(port), use_http_transport=self.__use_http_transport, + host=host, port=port, use_http_transport=self.__use_http_transport, http_path=self.__http_path, use_ssl=self.__use_ssl, **conn_kwargs) self.log_client("connected to {0} with impyla {1}".format( self.__host_port, self.get_test_protocol())) diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index ae9707ff8..fe808a499 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -35,6 +35,7 @@ from thrift.transport.TSocket import TSocket from thrift.transport.TTransport import TBufferedTransport from tests.common.impala_connection import create_connection, create_ldap_connection +from tests.common.network import to_host_port, CERT_TO_CA_MAP from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP LOG = logging.getLogger('impala_service') @@ -48,7 +49,7 @@ WEBSERVER_PASSWORD = os.environ.get('IMPALA_WEBSERVER_PASSWORD', None) # TODO: Refactor the retry/timeout logic into a common place. class BaseImpalaService(object): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file): + webserver_certificate_file, ssl_client_ca_certificate_file): self.hostname = hostname self.webserver_interface = webserver_interface if webserver_interface == "": @@ -56,6 +57,7 @@ class BaseImpalaService(object): self.webserver_interface = hostname self.webserver_port = webserver_port self.webserver_certificate_file = webserver_certificate_file + self.ssl_client_ca_certificate_file = ssl_client_ca_certificate_file self.webserver_username_password = None if WEBSERVER_USERNAME is not None and WEBSERVER_PASSWORD is not None: self.webserver_username_password = (WEBSERVER_USERNAME, WEBSERVER_PASSWORD) @@ -68,10 +70,14 @@ class BaseImpalaService(object): protocol = "http" if self.webserver_certificate_file != "": protocol = "https" - url = "%s://%s:%d/%s" % \ - (protocol, self.webserver_interface, int(self.webserver_port), page_name) - return requests.get(url, verify=self.webserver_certificate_file, - auth=self.webserver_username_password) + host_port = to_host_port(self.webserver_interface, self.webserver_port) + url = "%s://%s/%s" % (protocol, host_port, page_name) + cert = self.webserver_certificate_file + # Instead of cert use its CA cert if available. + file_part = cert.split("/")[-1] + if file_part in CERT_TO_CA_MAP: + cert = cert.replace(file_part, CERT_TO_CA_MAP[file_part]) + return requests.get(url, verify=cert, auth=self.webserver_username_password) except Exception as e: LOG.info("Debug webpage not yet available: %s", str(e)) sleep(interval) @@ -260,11 +266,14 @@ class BaseImpalaService(object): # Allows for interacting with an Impalad instance to perform operations such as creating # new connections or accessing the debug webpage. class ImpaladService(BaseImpalaService): - def __init__(self, hostname, webserver_interface="", webserver_port=25000, - beeswax_port=21000, krpc_port=27000, hs2_port=21050, - hs2_http_port=28000, webserver_certificate_file=""): + def __init__(self, hostname, webserver_interface="", external_interface="", + webserver_port=25000, beeswax_port=21000, krpc_port=27000, hs2_port=21050, + hs2_http_port=28000, webserver_certificate_file="", + ssl_client_ca_certificate_file=""): super(ImpaladService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) + self.external_interface = external_interface if external_interface else hostname self.beeswax_port = beeswax_port self.krpc_port = krpc_port self.hs2_port = hs2_port @@ -444,30 +453,32 @@ class ImpaladService(BaseImpalaService): sleep(interval) return False - def is_port_open(self, port): + def use_ssl_for_clients(self): + return self.ssl_client_ca_certificate_file != "" + + def is_port_open(self, host, port): try: - sock = socket.create_connection((self.hostname, port), timeout=1) + sock = socket.create_connection((host, port), timeout=1) sock.close() return True except Exception: return False def webserver_port_is_open(self): - return self.is_port_open(self.webserver_port) + return self.is_port_open(self.webserver_interface, self.webserver_port) def create_beeswax_client(self, use_kerberos=False): """Creates a new beeswax client connection to the impalad. DEPRECATED: Use create_hs2_client() instead.""" - LOG.warning('beeswax protocol is deprecated.') - client = create_connection('%s:%d' % (self.hostname, self.beeswax_port), - use_kerberos, BEESWAX) + client = create_connection(to_host_port(self.external_interface, self.beeswax_port), + use_kerberos, BEESWAX, use_ssl=self.use_ssl_for_clients()) client.connect() return client def beeswax_port_is_open(self): """Test if the beeswax port is open. Does not need to authenticate.""" # Check if the port is open first to avoid chatty logging of Thrift connection. - if not self.is_port_open(self.beeswax_port): return False + if not self.is_port_open(self.external_interface, self.beeswax_port): return False try: # The beeswax client will connect successfully even if not authenticated. @@ -475,31 +486,31 @@ class ImpaladService(BaseImpalaService): client.close() return True except Exception as e: - LOG.info(e) return False def create_ldap_beeswax_client(self, user, password, use_ssl=False): - client = create_ldap_connection('%s:%d' % (self.hostname, self.beeswax_port), + client = create_ldap_connection(to_host_port(self.hostname, self.beeswax_port), user=user, password=password, use_ssl=use_ssl) client.connect() return client def create_hs2_client(self, user=None): """Creates a new HS2 client connection to the impalad""" - client = create_connection('%s:%d' % (self.hostname, self.hs2_port), - protocol=HS2, user=user) + client = create_connection('%s:%d' % (self.external_interface, self.hs2_port), + protocol=HS2, user=user, + use_ssl=self.use_ssl_for_clients()) client.connect() return client def hs2_port_is_open(self): """Test if the HS2 port is open. Does not need to authenticate.""" # Check if the port is open first to avoid chatty logging of Thrift connection. - if not self.is_port_open(self.hs2_port): return False + if not self.is_port_open(self.external_interface, self.hs2_port): return False # Impyla will try to authenticate as part of connecting, so preserve previous logic # that uses the HS2 thrift code directly. try: - sock = TSocket(self.hostname, self.hs2_port) + sock = TSocket(self.external_interface, self.hs2_port) transport = TBufferedTransport(sock) transport.open() transport.close() @@ -510,7 +521,7 @@ class ImpaladService(BaseImpalaService): def hs2_http_port_is_open(self): # Only check if the port is open, do not create Thrift transport. - return self.is_port_open(self.hs2_http_port) + return self.is_port_open(self.external_interface, self.hs2_http_port) def create_client(self, protocol): """Creates a new client connection for given protocol to this impalad""" @@ -520,7 +531,8 @@ class ImpaladService(BaseImpalaService): if protocol == BEESWAX: LOG.warning('beeswax protocol is deprecated.') port = self.beeswax_port - client = create_connection('%s:%d' % (self.hostname, port), protocol=protocol) + client = create_connection(to_host_port(self.external_interface, port), + protocol=protocol) client.connect() return client @@ -537,9 +549,10 @@ class ImpaladService(BaseImpalaService): # accessing the debug webpage. class StateStoredService(BaseImpalaService): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file, service_port): + webserver_certificate_file, ssl_client_ca_certificate_file, service_port): super(StateStoredService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) self.service_port = service_port def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1): @@ -554,9 +567,10 @@ class StateStoredService(BaseImpalaService): # accessing the debug webpage. class CatalogdService(BaseImpalaService): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file, service_port): + webserver_certificate_file, ssl_client_ca_certificate_file, service_port): super(CatalogdService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) self.service_port = service_port def get_catalog_version(self, timeout=10, interval=1): @@ -579,6 +593,7 @@ class CatalogdService(BaseImpalaService): class AdmissiondService(BaseImpalaService): def __init__(self, hostname, webserver_interface, webserver_port, - webserver_certificate_file): + webserver_certificate_file, ssl_client_ca_certificate_file): super(AdmissiondService, self).__init__( - hostname, webserver_interface, webserver_port, webserver_certificate_file) + hostname, webserver_interface, webserver_port, webserver_certificate_file, + ssl_client_ca_certificate_file) diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 0ae933bbd..77a5bc253 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -49,6 +49,7 @@ from tests.common.environ import ( from tests.common.errors import Timeout from tests.common.impala_connection import create_connection from tests.common.impala_service import ImpaladService +from tests.common.network import to_host_port from tests.common.test_dimensions import ( ALL_BATCH_SIZES, ALL_DISABLE_CODEGEN_OPTIONS, @@ -265,7 +266,11 @@ class ImpalaTestSuite(BaseTestSuite): cls.hs2_http_client = None cls.hive_client = None cls.hive_transport = None + + # In case of custom cluster tests this returns the 1st impalad or None if nothing + # is started. cls.impalad_test_service = cls.create_impala_service() + ImpalaTestSuite.impalad_test_service = cls.impalad_test_service # Override the shell history path so that commands run by any tests # don't write any history into the developer's file. @@ -377,10 +382,12 @@ class ImpalaTestSuite(BaseTestSuite): if protocol is None: protocol = cls.default_test_protocol() if host_port is None: - host_port = cls.__get_default_host_port(protocol) + host = cls.impalad_test_service.external_interface + host_port = to_host_port(host, cls._get_default_port(protocol)) + use_ssl = cls.impalad_test_service.use_ssl_for_clients() client = create_connection( host_port=host_port, use_kerberos=pytest.config.option.use_kerberos, - protocol=protocol, is_hive=is_hive, user=user) + protocol=protocol, is_hive=is_hive, user=user, use_ssl=use_ssl) client.connect() return client @@ -413,7 +420,7 @@ class ImpalaTestSuite(BaseTestSuite): host, port = host_port.split(':') port = str(int(port) + nth) host_port = host + ':' + port - return ImpalaTestSuite.create_impala_client(host_port, protocol=protocol) + return cls.create_impala_client(host_port, protocol=protocol) @classmethod def create_impala_clients(cls): @@ -448,6 +455,11 @@ class ImpalaTestSuite(BaseTestSuite): @classmethod def close_impala_clients(cls): """Closes Impala clients created by create_impala_clients().""" + # cls.client should be equal to one of belove, unless test method implicitly override. + # Closing twice would lead to error in some clients (impyla+SSL). + if cls.client not in (cls.beeswax_client, cls.hs2_client, cls.hs2_http_client): + cls.client.close() + cls.client = None if cls.beeswax_client: cls.beeswax_client.close() cls.beeswax_client = None @@ -457,11 +469,6 @@ class ImpalaTestSuite(BaseTestSuite): if cls.hs2_http_client: cls.hs2_http_client.close() cls.hs2_http_client = None - # cls.client should be equal to one of above, unless test method implicitly override. - # Closing twice should be OK. - if cls.client: - cls.client.close() - cls.client = None @classmethod def default_impala_client(cls, protocol): @@ -474,13 +481,13 @@ class ImpalaTestSuite(BaseTestSuite): raise Exception("unknown protocol: {0}".format(protocol)) @classmethod - def __get_default_host_port(cls, protocol): + def _get_default_port(cls, protocol): if protocol == BEESWAX: - return IMPALAD + return IMPALAD_BEESWAX_PORT elif protocol == HS2_HTTP: - return IMPALAD_HS2_HTTP_HOST_PORT + return IMPALAD_HS2_HTTP_PORT elif protocol == HS2: - return IMPALAD_HS2_HOST_PORT + return IMPALAD_HS2_PORT else: raise NotImplementedError("Not yet implemented: protocol=" + protocol) @@ -497,13 +504,8 @@ class ImpalaTestSuite(BaseTestSuite): raise NotImplementedError("Not yet implemented: protocol=" + protocol) @classmethod - def create_impala_service( - cls, host_port=IMPALAD, webserver_interface="", webserver_port=25000): - host, port = host_port.split(':') - if webserver_interface == "": - webserver_interface = host - return ImpaladService(host, beeswax_port=port, - webserver_interface=webserver_interface, webserver_port=webserver_port) + def create_impala_service(cls): + return ImpaladService(IMPALAD_HOSTNAME) @classmethod def create_hdfs_client(cls): @@ -773,7 +775,7 @@ class ImpalaTestSuite(BaseTestSuite): target_impalad_clients = list() if multiple_impalad: target_impalad_clients =\ - [ImpalaTestSuite.create_impala_client(host_port, protocol=protocol) + [self.create_impala_client(host_port, protocol=protocol) for host_port in self.__get_cluster_host_ports(protocol)] else: target_impalad_clients = [self.default_impala_client(protocol)] @@ -822,7 +824,7 @@ class ImpalaTestSuite(BaseTestSuite): Helper to execute a query block in Hive. No special handling of query options is done, since we use a separate session for each block. """ - h = ImpalaTestSuite.create_impala_client(HIVE_HS2_HOST_PORT, protocol=HS2, + h = self.create_impala_client(HIVE_HS2_HOST_PORT, protocol=HS2, is_hive=True) try: result = None @@ -1788,3 +1790,25 @@ class ImpalaTestSuite(BaseTestSuite): break properties[fields[1].rstrip()] = fields[2].rstrip() return properties + + # Checks if an Impala connection is functional. + @staticmethod + def check_connection(conn): + res = conn.execute("select 1 + 1") + assert res.data == ["2"] + + # Checks connections for all protocols. + def check_connections(cls, expected_count=3): + # default client must exist + cls.check_connection(cls.client) + count = 0 + if cls.beeswax_client: + cls.check_connection(cls.beeswax_client) + count += 1 + if cls.hs2_client: + cls.check_connection(cls.hs2_client) + count += 1 + if cls.hs2_http_client: + cls.check_connection(cls.hs2_http_client) + count += 1 + assert count == expected_count diff --git a/tests/common/network.py b/tests/common/network.py index 55502b4eb..7a2b11814 100644 --- a/tests/common/network.py +++ b/tests/common/network.py @@ -19,7 +19,9 @@ from __future__ import absolute_import, division, print_function import socket +import ssl +from tests.common.environ import IS_REDHAT_DERIVATIVE # Retrieves the host external IP rather than localhost/127.0.0.1 so we have an IP that # Impala will consider distinct from storage backends to force remote scheduling. @@ -30,3 +32,49 @@ def get_external_ip(): # Timeout=0 means it doesn't need to resolve. s.connect(('10.254.254.254', 1)) return s.getsockname()[0] + + +def split_host_port(host_port): + """Checks if the host name also contains a port and separates the two. + Returns either (host, None) or (host, port). Detects if host is an ipv6 address + like "[::]" and removes the brackets from it. + """ + is_ipv6_address = host_port[0] == "[" + if is_ipv6_address: + parts = host_port[1:].split("]") + if len(parts) == 1 or not parts[1]: + return (parts[0], None) + return (parts[0], int(parts[1][1:])) + else: + parts = host_port.split(":") + if len(parts) == 1: + return (parts[0], None) + return (parts[0], int(parts[1])) + + +def to_host_port(host, port): + is_ipv6_address = ":" in host + fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}" + return fmt.format(host, port) + + +CERT_TO_CA_MAP = { + "wildcard-cert.pem": "wildcardCA.pem", + "wildcard-san-cert.pem": "wildcardCA.pem" +} + +REQUIRED_MIN_OPENSSL_VERSION = 0x10001000 +# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5 +# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already +if IS_REDHAT_DERIVATIVE: + REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5) +else: + REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9) +_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None) +if _openssl_version_number is None: + SKIP_SSL_MSG = "Legacy OpenSSL module detected" +elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION: + SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % ( + ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION) +else: + SKIP_SSL_MSG = None diff --git a/tests/custom_cluster/test_client_ssl.py b/tests/custom_cluster/test_client_ssl.py index 832b547ae..09c462d97 100644 --- a/tests/custom_cluster/test_client_ssl.py +++ b/tests/custom_cluster/test_client_ssl.py @@ -24,34 +24,20 @@ import pytest import re import requests import signal -import ssl import socket import sys import time -from tests.common.environ import IS_REDHAT_DERIVATIVE + from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_service import ImpaladService +from tests.common.network import SKIP_SSL_MSG, REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 from tests.common.test_dimensions import create_client_protocol_dimension from tests.common.test_vector import BEESWAX from tests.shell.util import run_impala_shell_cmd, run_impala_shell_cmd_no_expect, \ ImpalaShell, create_impala_shell_executable_dimension -REQUIRED_MIN_OPENSSL_VERSION = 0x10001000 -# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5 -# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already -if IS_REDHAT_DERIVATIVE: - REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5) -else: - REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9) -_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None) -if _openssl_version_number is None: - SKIP_SSL_MSG = "Legacy OpenSSL module detected" -elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION: - SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % ( - ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION) -else: - SKIP_SSL_MSG = None + CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME'] @@ -99,7 +85,6 @@ class TestClientSsl(CustomClusterTestSuite): @CustomClusterTestSuite.with_args(impalad_args=SSL_ARGS, statestored_args=SSL_ARGS, catalogd_args=SSL_ARGS) def test_ssl(self, vector): - self._verify_negative_cases(vector) # TODO: This is really two different tests, but the custom cluster takes too long to # start. Make it so that custom clusters can be specified across test suites. @@ -140,6 +125,7 @@ class TestClientSsl(CustomClusterTestSuite): print(result.stderr) assert "Query Status: Cancelled" in result.stdout assert impalad.wait_for_num_in_flight_queries(0) + self.check_connections() WEBSERVER_SSL_ARGS = ("--webserver_certificate_file=%(cert_dir)s/server-cert.pem " "--webserver_private_key_file=%(cert_dir)s/server-key.pem " @@ -217,6 +203,7 @@ class TestClientSsl(CustomClusterTestSuite): self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR, host="ip4.impala.test") + self.check_connections() @pytest.mark.execute_serially @CustomClusterTestSuite.with_args(impalad_args=SSL_WILDCARD_SAN_ARGS, @@ -225,7 +212,6 @@ class TestClientSsl(CustomClusterTestSuite): @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG) def test_wildcard_san_ssl(self, vector): """ Test for IMPALA-3159: Test with a certificate which has a wildcard as a SAN. """ - # This block of code is the same as _validate_positive_cases() but we want to check # if retrieving the SAN is supported first. args = ["--ssl", "-q", "select 1 + 2", "--ca_cert=%s/wildcardCA.pem" % CERT_DIR] @@ -239,6 +225,8 @@ class TestClientSsl(CustomClusterTestSuite): self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR, host="ip4.impala.test") + self.check_connections() + def _verify_negative_cases(self, vector, host=""): # Expect the shell to not start successfully if we point --ca_cert to an incorrect diff --git a/tests/custom_cluster/test_event_processing_error.py b/tests/custom_cluster/test_event_processing_error.py index 49d5e9b10..3942e2698 100644 --- a/tests/custom_cluster/test_event_processing_error.py +++ b/tests/custom_cluster/test_event_processing_error.py @@ -352,18 +352,18 @@ class TestEventProcessingError(CustomClusterTestSuite): replication tests """ # inserts on transactional tables - TestEventProcessingBase._run_test_insert_events_impl(unique_database, True) + TestEventProcessingBase._run_test_insert_events_impl(self, unique_database, True) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" try: test_db = unique_database + "_no_transact" self.run_stmt_in_hive("""create database {}""".format(test_db)) # inserts on external tables - TestEventProcessingBase._run_test_insert_events_impl(test_db, False) + TestEventProcessingBase._run_test_insert_events_impl(self, test_db, False) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" finally: self.run_stmt_in_hive("""drop database {} cascade""".format(test_db)) # replication related tests - TestEventProcessingBase._run_event_based_replication_tests_impl( + TestEventProcessingBase._run_event_based_replication_tests_impl(self, self.filesystem_client) assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" diff --git a/tests/custom_cluster/test_ipv6.py b/tests/custom_cluster/test_ipv6.py new file mode 100644 index 000000000..f4a0a9de5 --- /dev/null +++ b/tests/custom_cluster/test_ipv6.py @@ -0,0 +1,251 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import, division, print_function +import json +import logging +import os +import pytest +import requests + +from tests.common.custom_cluster_test_suite import CustomClusterTestSuite +from tests.common.network import SKIP_SSL_MSG +from tests.common.test_dimensions import create_client_protocol_dimension +from tests.shell.util import run_impala_shell_cmd, \ + create_impala_shell_executable_dimension +from tests.common.impala_connection import create_connection + +LOG = logging.getLogger('impala_test_suite') + +CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME'] + +# Use wildcard san cert to be flexible about host name. +SSL_WILDCARD_SAN_ARGS = ("--ssl_client_ca_certificate={0}/wildcardCA.pem " + "--ssl_server_certificate={0}/wildcard-san-cert.pem " + "--ssl_private_key={0}/wildcard-san-cert.key " + "--hostname={1} " + "--state_store_host={1} " + "--catalog_service_host={1} " + "--webserver_certificate_file={0}/wildcard-san-cert.pem " + "--webserver_private_key_file={0}/wildcard-san-cert.key " + ).format(CERT_DIR, "ip4.impala.test") + +WILDCARD_CA_CERT_PATH = "%s/wildcardCA.pem" % CERT_DIR + +IPV6_ONLY_IP_WEBSERBER_ARG = "--webserver_interface=::1 " +IPV6_DUAL_IP_WEBSERBER_ARG = "--webserver_interface=:: " +IPV6_ONLY_IP_QUERY_ARG = "--external_interface=::1 " +IPV6_DUAL_IP_QUERY_ARG = "--external_interface=:: " +IPV6_ONLY_IP_COORDINATOR_ARG = \ + "%s %s" % (IPV6_ONLY_IP_WEBSERBER_ARG, IPV6_ONLY_IP_QUERY_ARG) +IPV6_DUAL_IP_COORDINATOR_ARG = \ + "%s %s" % (IPV6_DUAL_IP_WEBSERBER_ARG, IPV6_DUAL_IP_QUERY_ARG) + +IPV6_ONLY_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip6.impala.test " +IPV6_DUAL_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip46.impala.test " +IPV6_ONLY_HOSTNAME_QUERY_ARG = "--external_interface=::1 " +IPV6_DUAL_HOSTNAME_QUERY_ARG = "--external_interface=:: " + +WEBUI_PORTS = [25000, 25010, 25020] + +# Error text can depend on both protocol and python version. +CONN_ERR = ["Could not connect", "Connection refused"] +CERT_ERR = ["doesn't match", "certificate verify failed"] +WEB_CERT_ERR = "CertificateError" + + +class TestIPv6Base(CustomClusterTestSuite): + ca_cert = None + + @classmethod + def setup_class(cls): + super(TestIPv6Base, cls).setup_class() + + @classmethod + def add_test_dimensions(cls): + super(TestIPv6Base, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension()) + cls.ImpalaTestMatrix.add_dimension(create_impala_shell_executable_dimension()) + + def _smoke(self, host, vector, expected_errors=[]): + proto = vector.get_value('protocol') + try: + port = self._get_default_port(proto) + host_port = "%s:%d" % (host, port) + use_ssl = self.ca_cert is not None + conn = create_connection(host_port, protocol=proto, use_ssl=use_ssl) + conn.connect() + assert not expected_errors + res = conn.execute("select 1") + assert res.data == ["1"] + except Exception as ex: + for err in expected_errors: + if err in str(ex): return + raise ex + + def _webui_smoke(self, url, err=None): + """Tests to check glibc version and locale is available""" + try: + if self.ca_cert: + other_info_page = requests.get(url + "/?json", verify=self.ca_cert).text + else: + other_info_page = requests.get(url + "/?json", verify=False).text + assert err is None + other_info = json.loads(other_info_page) + assert "glibc_version" in other_info + except Exception as ex: + if not err: raise ex + assert err in str(ex) + + def _shell_smoke(self, host, vector, expected_errors=[]): + proto = vector.get_value('protocol') + port = self._get_default_port(proto) + host_port = "%s:%d" % (host, port) + try: + shell_options = ["-i", host_port, "-q", "select 1"] + if self.ca_cert: + shell_options.append("--ssl") + shell_options.append("--ca_cert=" + self.ca_cert) + result = run_impala_shell_cmd(vector, shell_options) + assert not expected_errors + assert "Fetched 1 row" in result.stderr + except Exception as ex: + for err in expected_errors: + if err in str(ex): return + raise ex + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_DUAL_IP_WEBSERBER_ARG + + IPV6_DUAL_IP_QUERY_ARG, + statestored_args=IPV6_DUAL_IP_WEBSERBER_ARG, + catalogd_args=IPV6_DUAL_IP_WEBSERBER_ARG) +class TestIPv6DualNoSsl(TestIPv6Base): + + def test_ipv6_dual_no_ssl(self, vector): + for port in WEBUI_PORTS: + self._webui_smoke("http://127.0.0.1:%d" % port) + self._webui_smoke("http://[::1]:%d" % port) + self._webui_smoke("http://ip4.impala.test:%d" % port) + self._webui_smoke("http://ip6.impala.test:%d" % port) + self._webui_smoke("http://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector) + self._smoke("127.0.0.1", vector) + self._smoke("ip4.impala.test", vector) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector) + self._shell_smoke("127.0.0.1", vector) + self._shell_smoke("ip4.impala.test", vector) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_ONLY_IP_WEBSERBER_ARG + + IPV6_ONLY_IP_QUERY_ARG, + statestored_args=IPV6_ONLY_IP_WEBSERBER_ARG, + catalogd_args=IPV6_ONLY_IP_WEBSERBER_ARG) +class TestIPv6OnlyNoSsl(TestIPv6Base): + + def test_ipv6_only_no_ssl(self, vector): + self.check_connections() + for port in WEBUI_PORTS: + self._webui_smoke("http://127.0.0.1:%d" % port, err="Connection refused") + self._webui_smoke("http://[::1]:%d" % port) + self._webui_smoke("http://ip4.impala.test:%d" % port, err="Connection refused") + self._webui_smoke("http://ip6.impala.test:%d" % port) + self._webui_smoke("http://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector) + self._smoke("127.0.0.1", vector, CONN_ERR) + self._smoke("ip4.impala.test", vector, CONN_ERR) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector) + self._shell_smoke("127.0.0.1", vector, CONN_ERR) + self._shell_smoke("ip4.impala.test", vector, CONN_ERR) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG + + IPV6_DUAL_HOSTNAME_QUERY_ARG + + SSL_WILDCARD_SAN_ARGS, + statestored_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS, + catalogd_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS) +class TestIPv6DualSsl(TestIPv6Base): + ca_cert = WILDCARD_CA_CERT_PATH + + @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG) + def test_ipv6_dual_ssl(self, vector): + self.check_connections() + for port in WEBUI_PORTS: + self._webui_smoke("https://127.0.0.1:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://[::1]:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://ip4.impala.test:%d" % port) + self._webui_smoke("https://ip6.impala.test:%d" % port) + self._webui_smoke("https://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector, CONN_ERR) + self._smoke("127.0.0.1", vector, CONN_ERR) + self._smoke("ip4.impala.test", vector) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector, CERT_ERR) + self._shell_smoke("127.0.0.1", vector, CERT_ERR) + self._shell_smoke("ip4.impala.test", vector) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) + + +@CustomClusterTestSuite.with_args(impalad_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG + + IPV6_ONLY_HOSTNAME_QUERY_ARG + + SSL_WILDCARD_SAN_ARGS, + statestored_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS, + catalogd_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG + + SSL_WILDCARD_SAN_ARGS) +class TestIPv6OnlySsl(TestIPv6Base): + ca_cert = WILDCARD_CA_CERT_PATH + + @pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG) + def test_ipv6_only_ssl(self, vector): + self.check_connections() + for port in WEBUI_PORTS: + self._webui_smoke("https://127.0.0.1:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://[::1]:%d" % port, WEB_CERT_ERR) + self._webui_smoke("https://ip4.impala.test:%d" % port, "Connection refused") + self._webui_smoke("https://ip6.impala.test:%d" % port) + self._webui_smoke("https://ip46.impala.test:%d" % port) + + self._smoke("[::1]", vector, CONN_ERR) + self._smoke("127.0.0.1", vector, CONN_ERR) + self._smoke("ip4.impala.test", vector, CONN_ERR) + self._smoke("ip6.impala.test", vector) + self._smoke("ip46.impala.test", vector) + + self._shell_smoke("[::1]", vector, CERT_ERR) + self._shell_smoke("127.0.0.1", vector, CONN_ERR) + self._shell_smoke("ip4.impala.test", vector, CONN_ERR) + self._shell_smoke("ip6.impala.test", vector) + self._shell_smoke("ip46.impala.test", vector) diff --git a/tests/custom_cluster/test_redaction.py b/tests/custom_cluster/test_redaction.py index 7e4b2aad4..0ca333206 100644 --- a/tests/custom_cluster/test_redaction.py +++ b/tests/custom_cluster/test_redaction.py @@ -39,6 +39,10 @@ class TestRedaction(CustomClusterTestSuite): limited to table data and query text since queries may refer to table data. ''' + @classmethod + def setup_class(cls): + super(TestRedaction, cls).setup_class() + @property def log_dir(self): return os.path.join(self.tmp_dir, "logs") diff --git a/tests/metadata/test_event_processing.py b/tests/metadata/test_event_processing.py index 1b05c146a..767833304 100644 --- a/tests/metadata/test_event_processing.py +++ b/tests/metadata/test_event_processing.py @@ -35,10 +35,14 @@ PROCESSING_TIMEOUT_S = 10 LOG = logging.getLogger(__name__) @SkipIfFS.hive -class TestEventProcessing(ImpalaTestSuite): +class TestEventProcessing(TestEventProcessingBase): """This class contains tests that exercise the event processing mechanism in the catalog.""" + @classmethod + def setup_class(cls): + super(TestEventProcessing, cls).setup_class() + @classmethod def default_test_protocol(cls): return HS2 @@ -47,13 +51,13 @@ class TestEventProcessing(ImpalaTestSuite): def test_transactional_insert_events(self, unique_database): """Executes 'run_test_insert_events' for transactional tables. """ - TestEventProcessingBase._run_test_insert_events_impl( + TestEventProcessingBase._run_test_insert_events_impl(self, unique_database, is_transactional=True) def test_insert_events(self, unique_database): """Executes 'run_test_insert_events' for non-transactional tables. """ - TestEventProcessingBase._run_test_insert_events_impl(unique_database) + TestEventProcessingBase._run_test_insert_events_impl(self, unique_database) def test_iceberg_inserts(self): """IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation @@ -99,13 +103,12 @@ class TestEventProcessing(ImpalaTestSuite): self._run_test_empty_partition_events(unique_database, False) def test_event_based_replication(self): - TestEventProcessingBase._run_event_based_replication_tests_impl( + self._run_event_based_replication_tests_impl(self, self.filesystem_client) def _run_test_empty_partition_events(self, unique_database, is_transactional): test_tbl = unique_database + ".test_events" - TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties( - is_transactional) + TBLPROPERTIES = self._get_transactional_tblproperties(is_transactional) self.run_stmt_in_hive("create table {0} (key string, value string) \ partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES)) self.client.set_configuration({ diff --git a/tests/metadata/test_event_processing_base.py b/tests/metadata/test_event_processing_base.py index 97725a2c0..baafd5358 100644 --- a/tests/metadata/test_event_processing_base.py +++ b/tests/metadata/test_event_processing_base.py @@ -27,17 +27,22 @@ EVENT_SYNC_QUERY_OPTIONS = { class TestEventProcessingBase(ImpalaTestSuite): @classmethod - def _run_test_insert_events_impl(cls, unique_database, is_transactional=False): + def setup_class(cls): + super(TestEventProcessingBase, cls).setup_class() + + @classmethod + def _run_test_insert_events_impl(cls, suite, unique_database, is_transactional=False): """Test for insert event processing. Events are created in Hive and processed in Impala. The following cases are tested : Insert into table --> for partitioned and non-partitioned table Insert overwrite table --> for partitioned and non-partitioned table Insert into partition --> for partitioned table """ - with cls.create_impala_client() as impala_client: + # TODO: change into an instance method and remove argument "suite" (IMPALA-14174) + with suite.create_impala_client() as impala_client: # Test table with no partitions. tbl_insert_nopart = 'tbl_insert_nopart' - cls.run_stmt_in_hive( + suite.run_stmt_in_hive( "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart)) tblproperties = "" if is_transactional: @@ -141,16 +146,18 @@ class TestEventProcessingBase(ImpalaTestSuite): assert len(result.data) == 0 @classmethod - def _run_event_based_replication_tests_impl(cls, filesystem_client, transactional=True): + def _run_event_based_replication_tests_impl(cls, suite, + filesystem_client, transactional=True): """Hive Replication relies on the insert events generated on the tables. This test issues some basic replication commands from Hive and makes sure that the replicated table has correct data.""" + # TODO: change into an instance method and remove argument "suite" (IMPALA-14174) TBLPROPERTIES = cls._get_transactional_tblproperties(transactional) source_db = ImpalaTestSuite.get_random_name("repl_source_") target_db = ImpalaTestSuite.get_random_name("repl_target_") unpartitioned_tbl = "unpart_tbl" partitioned_tbl = "part_tbl" - impala_client = cls.create_impala_client() + impala_client = suite.create_impala_client() try: cls.run_stmt_in_hive("create database {0}".format(source_db)) cls.run_stmt_in_hive( diff --git a/tests/stress/test_acid_stress.py b/tests/stress/test_acid_stress.py index 5809cae06..93f9d26a8 100644 --- a/tests/stress/test_acid_stress.py +++ b/tests/stress/test_acid_stress.py @@ -93,7 +93,7 @@ class TestAcidInsertsBasic(TestAcidStress): def _impala_role_write_inserts(self, tbl_name, partitioned): """INSERT INTO/OVERWRITE a table several times from Impala.""" try: - impalad_client = ImpalaTestSuite.create_impala_client() + impalad_client = self.create_impala_client() part_expr = "partition (p=1)" if partitioned else "" for run in range(0, NUM_OVERWRITES + 1): OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) @@ -109,7 +109,7 @@ class TestAcidInsertsBasic(TestAcidStress): def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds): """SELECT from a table many times until the expected final values are found.""" try: - impalad_client = ImpalaTestSuite.create_impala_client() + impalad_client = self.create_impala_client() expected_result = {"run": -1, "i": 0} accept_empty_table = True while expected_result["run"] != NUM_OVERWRITES and \ @@ -182,7 +182,7 @@ class TestAcidInsertsBasic(TestAcidStress): def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, sleep_sec): insert_op = "OVERWRITE" if is_overwrite else "INTO" try: - impalad_client = ImpalaTestSuite.create_impala_client() + impalad_client = self.create_impala_client() impalad_client.execute( """insert {op} table {tbl_name} partition({partition}) select sleep({sleep_ms})""".format(op=insert_op, tbl_name=tbl_name, diff --git a/tests/stress/test_insert_stress.py b/tests/stress/test_insert_stress.py index 5653cc9eb..235c92416 100644 --- a/tests/stress/test_insert_stress.py +++ b/tests/stress/test_insert_stress.py @@ -36,6 +36,10 @@ class TestInsertStress(ImpalaTestSuite): def get_workload(self): return 'targeted-stress' + @classmethod + def setup_class(cls): + super(TestInsertStress, cls).setup_class() + @classmethod def add_test_dimensions(cls): super(TestInsertStress, cls).add_test_dimensions() @@ -46,8 +50,8 @@ class TestInsertStress(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, wid, num_inserts, counter): """Writes ascending numbers up to 'num_inserts' into column 'i'. To column 'wid' it writes its identifier passed in parameter 'wid'.""" - target_impalad = wid % ImpalaTestSuite.get_impalad_cluster_size() - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = wid % self.get_impalad_cluster_size() + impalad_client = self.create_client_for_nth_impalad(target_impalad) try: insert_cnt = 0 while insert_cnt < num_inserts: @@ -72,8 +76,8 @@ class TestInsertStress(ImpalaTestSuite): assert sorted_run == list(range(sorted_run[0], sorted_run[-1] + 1)), \ "wid: %d" % wid - target_impalad = cid % ImpalaTestSuite.get_impalad_cluster_size() - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = cid % self.get_impalad_cluster_size() + impalad_client = self.create_client_for_nth_impalad(target_impalad) try: while counter.value != writers: result = impalad_client.execute("select * from %s" % tbl_name) diff --git a/tests/stress/test_merge_stress.py b/tests/stress/test_merge_stress.py index e948aa650..7bd34cb23 100644 --- a/tests/stress/test_merge_stress.py +++ b/tests/stress/test_merge_stress.py @@ -41,8 +41,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite): def _impala_role_concurrent_updater(self, tbl_name, col, num_writes): """Increments values in column 'total' and in the column which is passed in 'col'.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) merge_stmt = """merge into {0} target using {0} source on source.total = target.total when matched then update set @@ -61,8 +61,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, num_inserts): """Adds a new row based on the maximum 'total' value.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) merge_stmt = """merge into {0} target using (select total, a, b, c from {0} order by total desc limit 1) source on source.total +1 = target.total @@ -92,8 +92,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite): assert total == a + b + c return max_total - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) total = 0 while total < target_total: result = impalad_client.execute("select * from %s" % tbl_name) diff --git a/tests/stress/test_update_stress.py b/tests/stress/test_update_stress.py index 06579f7b8..5d76451af 100644 --- a/tests/stress/test_update_stress.py +++ b/tests/stress/test_update_stress.py @@ -82,8 +82,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, col, num_updates): """Increments values in column 'total' and in the column which is passed in 'col'.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) update_cnt = 0 while update_cnt < num_updates: try: @@ -107,8 +107,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite): assert total == a + b + c return total - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) total = 0 while total < target_total: result = impalad_client.execute("select * from %s" % tbl_name) @@ -163,8 +163,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows): """Deletes every row from the table one by one.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) impalad_client.set_configuration_option("SYNC_DDL", "true") i = 0 while i < num_rows: @@ -181,8 +181,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted): """Updates every row in the table in a loop.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) impalad_client.set_configuration_option("SYNC_DDL", "true") while all_rows_deleted.value != 1: try: @@ -196,8 +196,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted): """Optimizes the table in a loop.""" - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) impalad_client.set_configuration_option("SYNC_DDL", "true") while all_rows_deleted.value != 1: try: @@ -253,8 +253,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite): prev_j = j assert prev_id == num_rows - 1 - target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1) - impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) + target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1) + impalad_client = self.create_client_for_nth_impalad(target_impalad) while all_rows_deleted.value != 1: result = impalad_client.execute("select * from %s order by id" % tbl_name) verify_result_set(result)