mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-13820: add ipv6 support for webui/hs2/hs2-http/beeswax
Main changes:
- added flag external_interface to override hostname for
beeswax/hs2/hs2-http port to allow testing ipv6 on these
interfaces without forcing ipv6 on internal communication
- compile Squeasel with USE_IPV6 to allow ipv6 on webui (webui
interface can be configured with existing flag webserver_interface)
- fixed the handling of [<ipv6addr>].<port> style addresses in
impala-shell (e.g. [::1]:21050) and test framework
- improved handling of custom clusters in test framework to
allow webui/ImpalaTestSuite's clients to work with non
standard settings (also fixes these clients with SSL)
Using ipv4 vs ipv6 vs dual stack can be configured by setting
the interface to bind to with flag webserver_interface and
external_interface. The Thrift server behind hs2/hs2-http/beeswax
only accepts a single host name and uses the first address
returned by getaddrinfo() that it can successfully bind to. This
means that unless an ipv6 address is used (like ::1) the behavior
will depend on the order of addresses returned by getaddrinfo():
63b7a263fc/lib/cpp/src/thrift/transport/TServerSocket.cpp (L481)
For dual stack the only way currently is to bind to "::",
as the Thrift server can only listen a single socket.
Testing:
- added custom cluster tests for ipv6 only/dual interface
with and without SSL
- manually tested in dual stack environment with client on a
different host
- among clients impala-shell and impyla are tested, but not
JDBC/ODBC
- no tests yet on truly ipv6 only environment, as internal
communication (e.g. krpc) is not ready for ipv6
To test manually the dev cluster can be started with ipv6 support:
dual mode:
bin/start-impala-cluster.py --impalad_args="--external_interface=:: --webserver_interface=::" --catalogd_args="--webserver_interface=::" --state_store_args="--webserver_interface=::"
ipv6 only:
bin/start-impala-cluster.py --impalad_args="--external_interface=::1 --webserver_interface=::1" --catalogd_args="--webserver_interface=::1" --state_store_args="--webserver_interface=::1"
Change-Id: I51ac66c568cc9bb06f4a3915db07a53c100109b6
Reviewed-on: http://gerrit.cloudera.org:8080/22527
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
892b33f35d
commit
5cca1aa9e5
@@ -270,9 +270,10 @@ ThriftServer::ThriftServer(const string& name,
|
||||
const std::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
|
||||
MetricGroup* metrics, int max_concurrent_connections, int64_t queue_timeout_ms,
|
||||
int64_t idle_poll_period_ms, TransportType transport_type,
|
||||
bool is_external_facing)
|
||||
bool is_external_facing, string host)
|
||||
: started_(false),
|
||||
port_(port),
|
||||
host_(std::move(host)),
|
||||
ssl_enabled_(false),
|
||||
max_concurrent_connections_(max_concurrent_connections),
|
||||
queue_timeout_ms_(queue_timeout_ms),
|
||||
@@ -342,7 +343,7 @@ Status ThriftServer::CreateSocket(std::shared_ptr<TServerSocket>* socket) {
|
||||
socket_factory->loadCertificate(certificate_path_.c_str());
|
||||
socket_factory->loadPrivateKey(private_key_path_.c_str());
|
||||
ImpalaKeepAliveServerSocket<TSSLServerSocket>* server_socket =
|
||||
new ImpalaKeepAliveServerSocket<TSSLServerSocket>(port_, socket_factory);
|
||||
new ImpalaKeepAliveServerSocket<TSSLServerSocket>(host_, port_, socket_factory);
|
||||
server_socket->setKeepAliveOptions(keepalive_probe_period_s_,
|
||||
keepalive_retry_period_s_, keepalive_retry_count_);
|
||||
socket->reset(server_socket);
|
||||
@@ -351,7 +352,7 @@ Status ThriftServer::CreateSocket(std::shared_ptr<TServerSocket>* socket) {
|
||||
}
|
||||
} else {
|
||||
ImpalaKeepAliveServerSocket<TServerSocket>* server_socket =
|
||||
new ImpalaKeepAliveServerSocket<TServerSocket>(port_);
|
||||
new ImpalaKeepAliveServerSocket<TServerSocket>(host_, port_);
|
||||
server_socket->setKeepAliveOptions(keepalive_probe_period_s_,
|
||||
keepalive_retry_period_s_, keepalive_retry_count_);
|
||||
socket->reset(server_socket);
|
||||
|
||||
@@ -309,7 +309,7 @@ class ThriftServer {
|
||||
int max_concurrent_connections = 0, int64_t queue_timeout_ms = 0,
|
||||
int64_t idle_poll_period_ms = 0,
|
||||
TransportType server_transport = TransportType::BINARY,
|
||||
bool is_external_facing = true);
|
||||
bool is_external_facing = true, std::string host = "");
|
||||
|
||||
/// Enables secure access over SSL. Must be called before Start(). The first three
|
||||
/// arguments are the minimum SSL/TLS version, and paths to certificate and private key
|
||||
@@ -345,6 +345,9 @@ class ThriftServer {
|
||||
/// replaced with whatever port number the server is listening on.
|
||||
int port_;
|
||||
|
||||
/// The host name to bind with.
|
||||
string host_;
|
||||
|
||||
/// True if the server socket only accepts SSL connections
|
||||
bool ssl_enabled_;
|
||||
|
||||
@@ -546,6 +549,11 @@ class ThriftServerBuilder {
|
||||
return *this;
|
||||
}
|
||||
|
||||
ThriftServerBuilder& host(const string& host) {
|
||||
host_ = host;
|
||||
return *this;
|
||||
}
|
||||
|
||||
/// Constructs a new ThriftServer and puts it in 'server', if construction was
|
||||
/// successful, returns an error otherwise. In the error case, 'server' will not have
|
||||
/// been set and will not need to be freed, otherwise the caller assumes ownership of
|
||||
@@ -554,7 +562,7 @@ class ThriftServerBuilder {
|
||||
std::unique_ptr<ThriftServer> ptr(
|
||||
new ThriftServer(name_, processor_, port_, auth_provider_, metrics_,
|
||||
max_concurrent_connections_, queue_timeout_ms_, idle_poll_period_ms_,
|
||||
server_transport_type_, is_external_facing_));
|
||||
server_transport_type_, is_external_facing_, host_));
|
||||
if (enable_ssl_) {
|
||||
RETURN_IF_ERROR(ptr->EnableSsl(
|
||||
version_, certificate_, private_key_, pem_password_cmd_, cipher_list_,
|
||||
@@ -572,6 +580,7 @@ class ThriftServerBuilder {
|
||||
int max_concurrent_connections_ = 0;
|
||||
std::string name_;
|
||||
std::shared_ptr<apache::thrift::TProcessor> processor_;
|
||||
std::string host_;
|
||||
int port_ = 0;
|
||||
ThriftServer::TransportType server_transport_type_ =
|
||||
ThriftServer::TransportType::BINARY;
|
||||
|
||||
@@ -157,6 +157,9 @@ DEFINE_bool(enable_external_fe_http, false,
|
||||
"if true enables http transport for external_fe_port otherwise binary transport is "
|
||||
"used");
|
||||
|
||||
DEFINE_string(external_interface, "",
|
||||
"Host name to bind with in beeswax/hs2/hs2-http. \"::\" allows IPv6 (dual stack).");
|
||||
|
||||
DEFINE_int32(fe_service_threads, 64,
|
||||
"number of threads available to serve client requests");
|
||||
DEFINE_string(default_query_options, "", "key=value pair of default query options for"
|
||||
@@ -3238,6 +3241,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
|
||||
.keepalive(FLAGS_client_keepalive_probe_period_s,
|
||||
FLAGS_client_keepalive_retry_period_s,
|
||||
FLAGS_client_keepalive_retry_count)
|
||||
.host(FLAGS_external_interface)
|
||||
.Build(&server));
|
||||
beeswax_server_.reset(server);
|
||||
beeswax_server_->SetConnectionHandler(this);
|
||||
@@ -3270,6 +3274,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
|
||||
.keepalive(FLAGS_client_keepalive_probe_period_s,
|
||||
FLAGS_client_keepalive_retry_period_s,
|
||||
FLAGS_client_keepalive_retry_count)
|
||||
.host(FLAGS_external_interface)
|
||||
.Build(&server));
|
||||
hs2_server_.reset(server);
|
||||
hs2_server_->SetConnectionHandler(this);
|
||||
@@ -3305,6 +3310,8 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
|
||||
FLAGS_client_keepalive_retry_period_s,
|
||||
FLAGS_client_keepalive_retry_count)
|
||||
.Build(&server));
|
||||
// FLAGS_external_interface is not passed to external external_fe_port. If this is
|
||||
// needed (e.g. for dual stack) then another subtask can be added to IMPALA-13819.
|
||||
external_fe_server_.reset(server);
|
||||
external_fe_server_->SetConnectionHandler(this);
|
||||
}
|
||||
@@ -3338,6 +3345,7 @@ Status ImpalaServer::Start(int32_t beeswax_port, int32_t hs2_port,
|
||||
.keepalive(FLAGS_client_keepalive_probe_period_s,
|
||||
FLAGS_client_keepalive_retry_period_s,
|
||||
FLAGS_client_keepalive_retry_count)
|
||||
.host(FLAGS_external_interface)
|
||||
.Build(&http_server));
|
||||
hs2_http_server_.reset(http_server);
|
||||
hs2_http_server_->SetConnectionHandler(this);
|
||||
|
||||
@@ -163,6 +163,8 @@ add_dependencies(Util gen-deps)
|
||||
# Squeasel requires C99 compatibility to build.
|
||||
SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c
|
||||
PROPERTIES COMPILE_FLAGS -std=c99)
|
||||
SET_SOURCE_FILES_PROPERTIES(${SQUEASEL_SRC_DIR}/squeasel.c
|
||||
PROPERTIES COMPILE_FLAGS -DUSE_IPV6)
|
||||
|
||||
# shared library which provides native logging support to JVMs over JNI.
|
||||
add_library(loggingsupport SHARED
|
||||
|
||||
@@ -53,6 +53,7 @@ using std::random_device;
|
||||
namespace impala {
|
||||
|
||||
const string LOCALHOST_IP_STR("127.0.0.1");
|
||||
const string LOCALHOST_IP_V6_STR("::1");
|
||||
|
||||
Status GetHostname(string* hostname) {
|
||||
char name[HOST_NAME_MAX];
|
||||
@@ -67,33 +68,44 @@ Status GetHostname(string* hostname) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){
|
||||
Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip, bool ipv6){
|
||||
// Try to resolve via the operating system.
|
||||
vector<IpAddr> addresses;
|
||||
addrinfo hints;
|
||||
memset(&hints, 0, sizeof(struct addrinfo));
|
||||
hints.ai_family = AF_INET; // IPv4 addresses only
|
||||
hints.ai_family = ipv6 ? AF_INET6 : AF_INET;
|
||||
hints.ai_socktype = SOCK_STREAM;
|
||||
|
||||
struct addrinfo* addr_info;
|
||||
if (getaddrinfo(hostname.c_str(), NULL, &hints, &addr_info) != 0) {
|
||||
stringstream ss;
|
||||
ss << "Could not find IPv4 address for: " << hostname;
|
||||
ss << "Could not find IPv" << (ipv6 ? 6 : 4) << " address for: " << hostname;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
addrinfo* it = addr_info;
|
||||
while (it != NULL) {
|
||||
char addr_buf[64];
|
||||
const char* result =
|
||||
inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr, addr_buf, 64);
|
||||
char addr_buf[INET6_ADDRSTRLEN];
|
||||
const char* result = nullptr;
|
||||
bool is_ipv6_addr = it->ai_family == AF_INET6;
|
||||
if (is_ipv6_addr) {
|
||||
result = inet_ntop(AF_INET6, &((sockaddr_in6*)it->ai_addr)->sin6_addr,
|
||||
addr_buf, sizeof(addr_buf));
|
||||
} else {
|
||||
result = inet_ntop(AF_INET, &((sockaddr_in*)it->ai_addr)->sin_addr,
|
||||
addr_buf, sizeof(addr_buf));
|
||||
}
|
||||
|
||||
if (result == NULL) {
|
||||
stringstream ss;
|
||||
ss << "Could not convert IPv4 address for: " << hostname;
|
||||
ss << "Could not convert IPv" << (is_ipv6_addr ? 6: 4)
|
||||
<< "address for: " << hostname;
|
||||
freeaddrinfo(addr_info);
|
||||
return Status(ss.str());
|
||||
}
|
||||
addresses.push_back(string(addr_buf));
|
||||
if (is_ipv6_addr == ipv6) {
|
||||
addresses.push_back(string(addr_buf));
|
||||
}
|
||||
it = it->ai_next;
|
||||
}
|
||||
|
||||
@@ -101,7 +113,7 @@ Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip){
|
||||
|
||||
if (addresses.empty()) {
|
||||
stringstream ss;
|
||||
ss << "Could not convert IPv4 address for: " << hostname;
|
||||
ss << "Could not convert IPv" << (ipv6 ? 6 : 4) << " address for: " << hostname;
|
||||
return Status(ss.str());
|
||||
}
|
||||
|
||||
@@ -130,7 +142,7 @@ bool IsResolvedAddress(const NetworkAddressPB& addr) {
|
||||
|
||||
bool FindFirstNonLocalhost(const vector<string>& addresses, string* addr) {
|
||||
for (const string& candidate: addresses) {
|
||||
if (candidate != LOCALHOST_IP_STR) {
|
||||
if (candidate != LOCALHOST_IP_STR && candidate != LOCALHOST_IP_V6_STR) {
|
||||
*addr = candidate;
|
||||
return true;
|
||||
}
|
||||
@@ -226,7 +238,13 @@ bool IsWildcardAddress(const string& ipaddress) {
|
||||
|
||||
string TNetworkAddressToString(const TNetworkAddress& address) {
|
||||
stringstream ss;
|
||||
ss << address.hostname << ":" << dec << address.port;
|
||||
if (address.hostname.find(':') == string::npos) {
|
||||
// IPv4
|
||||
ss << address.hostname << ":" << dec << address.port;
|
||||
} else {
|
||||
// IPv6
|
||||
ss << "[" << address.hostname << "]:" << dec << address.port;
|
||||
}
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
|
||||
@@ -42,9 +42,11 @@ bool IsResolvedAddress(const NetworkAddressPB& addr);
|
||||
|
||||
/// Looks up all IP addresses associated with a given hostname and returns one of them via
|
||||
/// 'address'. If the IP addresses of a host don't change, then subsequent calls will
|
||||
/// always return the same address. Returns an error status if any system call failed,
|
||||
/// otherwise OK. Even if OK is returned, addresses may still be of zero length.
|
||||
Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) WARN_UNUSED_RESULT;
|
||||
/// always return the same address. Returns an error status if any system call failed or
|
||||
/// no address was found, otherwise OK.
|
||||
/// Returns only ipv6 addresses if 'ipv6' is true, otherwise only ipv4 addresses.
|
||||
Status HostnameToIpAddr(
|
||||
const Hostname& hostname, IpAddr* ip, bool ipv6=false) WARN_UNUSED_RESULT;
|
||||
|
||||
/// Finds the first non-localhost IP address in the given list. Returns
|
||||
/// true if such an address was found, false otherwise.
|
||||
|
||||
@@ -408,17 +408,25 @@ bool Webserver::IsSecure() const {
|
||||
Status Webserver::Start() {
|
||||
LOG(INFO) << "Starting webserver on " << TNetworkAddressToString(http_address_);
|
||||
|
||||
IpAddr ip;
|
||||
RETURN_IF_ERROR(HostnameToIpAddr(http_address_.hostname, &ip));
|
||||
stringstream listening_spec;
|
||||
listening_spec << ip << ":" << http_address_.port;
|
||||
IpAddr ipv4, ipv6;
|
||||
Status ip_v6_status = HostnameToIpAddr(http_address_.hostname, &ipv6, true);
|
||||
Status ip_v4_status = HostnameToIpAddr(http_address_.hostname, &ipv4, false);
|
||||
if (!ip_v4_status.ok() && !ip_v6_status.ok()) return ip_v6_status;
|
||||
|
||||
if (IsSecure()) {
|
||||
LOG(INFO) << "Webserver: Enabling HTTPS support";
|
||||
// Squeasel makes sockets with 's' suffixes accept SSL traffic only
|
||||
listening_spec << "s";
|
||||
if (IsSecure()) LOG(INFO) << "Webserver: Enabling HTTPS support";
|
||||
stringstream listening_spec;
|
||||
if (ip_v6_status.ok()) {
|
||||
listening_spec << "[" << ipv6 << "]:" << http_address_.port;
|
||||
if (IsSecure()) listening_spec << "s";
|
||||
}
|
||||
if (ip_v4_status.ok()) {
|
||||
if (ip_v6_status.ok()) listening_spec << ",";
|
||||
listening_spec << ipv4 << ":" << http_address_.port;
|
||||
if (IsSecure()) listening_spec << "s";
|
||||
}
|
||||
string listening_str = listening_spec.str();
|
||||
LOG(INFO) << "Starting webserver listening to " << listening_str;
|
||||
|
||||
vector<const char*> options;
|
||||
|
||||
if (!FLAGS_webserver_doc_root.empty() && FLAGS_enable_webserver_doc_root) {
|
||||
|
||||
@@ -429,8 +429,7 @@ class ImpalaClient(object):
|
||||
# symptoms in case of a problematic remote endpoint. It's better to have a finite
|
||||
# timeout so that in case of any connection errors, the client retries have a better
|
||||
# chance of succeeding.
|
||||
|
||||
host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port)
|
||||
host_and_port = self._to_host_port(self.impalad_host, self.impalad_port)
|
||||
assert self.http_path
|
||||
# ImpalaHttpClient relies on the URI scheme (http vs https) to open an appropriate
|
||||
# connection to the server.
|
||||
@@ -582,6 +581,13 @@ class ImpalaClient(object):
|
||||
num_deleted_rows = sum([int(k) for k in dml_result.rows_deleted.values()])
|
||||
return (num_rows, num_deleted_rows, dml_result.num_row_errors)
|
||||
|
||||
@staticmethod
|
||||
def _to_host_port(host, port):
|
||||
# Wrap ipv6 addresses in brackets.
|
||||
is_ipv6_address = ":" in host
|
||||
fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}"
|
||||
return fmt.format(host, port)
|
||||
|
||||
|
||||
class ImpalaHS2Client(ImpalaClient):
|
||||
"""Impala client. Uses the HS2 protocol plus Impala-specific extensions."""
|
||||
|
||||
@@ -1001,6 +1001,31 @@ class ImpalaShell(cmd.Cmd, object):
|
||||
"""Exit the impala shell"""
|
||||
return self.do_quit(args)
|
||||
|
||||
@staticmethod
|
||||
def __parse_host_port(addr):
|
||||
"""Checks if the host name also contains a port and separates the two.
|
||||
Returns either [host] or [host, port]. Detects if host is an ipv6 address like "[::]"
|
||||
and removes the brackets from it.
|
||||
"""
|
||||
split_by_colon = addr.split(':')
|
||||
ipv6addr = len(split_by_colon) > 2
|
||||
host_port = None
|
||||
if ipv6addr:
|
||||
if addr[0] == "[":
|
||||
parts = addr.split("]")
|
||||
host_port = [parts[0][1:]]
|
||||
has_port = parts[1] != ""
|
||||
if has_port:
|
||||
if parts[1][0] != ":": return None
|
||||
host_port.append(parts[1][1:])
|
||||
else:
|
||||
host_port = [addr]
|
||||
else:
|
||||
host_port = [val for val in split_by_colon if val.strip()]
|
||||
# validate the connection string.
|
||||
if ':' in addr and len(host_port) != 2: return None
|
||||
return host_port
|
||||
|
||||
def do_connect(self, args):
|
||||
"""Connect to an Impalad instance:
|
||||
Usage: connect, defaults to the fqdn of the localhost and the protocol's default port
|
||||
@@ -1023,10 +1048,10 @@ class ImpalaShell(cmd.Cmd, object):
|
||||
|
||||
if not args: args = socket.getfqdn()
|
||||
tokens = args.split(" ")
|
||||
# validate the connection string.
|
||||
host_port = [val for val in tokens[0].split(':') if val.strip()]
|
||||
addr = tokens[0]
|
||||
host_port = self.__parse_host_port(addr)
|
||||
protocol = options.protocol.lower()
|
||||
if (':' in tokens[0] and len(host_port) != 2):
|
||||
if not host_port:
|
||||
print("Connection string must either be empty, or of the form "
|
||||
"<hostname[:port]>", file=sys.stderr)
|
||||
return CmdStatus.ERROR
|
||||
|
||||
@@ -35,6 +35,7 @@ import sys
|
||||
import time
|
||||
|
||||
from builtins import filter, map
|
||||
|
||||
from thrift.protocol import TBinaryProtocol
|
||||
from thrift.Thrift import TApplicationException
|
||||
from thrift.transport.TTransport import TTransportException
|
||||
@@ -42,6 +43,7 @@ from thrift.transport.TTransport import TTransportException
|
||||
from impala_thrift_gen.beeswax import BeeswaxService
|
||||
from impala_thrift_gen.beeswax.BeeswaxService import QueryState
|
||||
from impala_thrift_gen.ImpalaService import ImpalaService
|
||||
from tests.common.network import split_host_port
|
||||
from tests.util.thrift_util import create_transport
|
||||
|
||||
LOG = logging.getLogger('impala_beeswax')
|
||||
@@ -115,12 +117,9 @@ class ImpalaBeeswaxClient(object):
|
||||
def __init__(self, impalad, use_kerberos=False, user=None, password=None,
|
||||
use_ssl=False):
|
||||
self.connected = False
|
||||
split_impalad = impalad.split(":")
|
||||
assert len(split_impalad) in [1, 2]
|
||||
self.impalad_host = split_impalad[0]
|
||||
self.impalad_port = 21000 # Default beeswax port
|
||||
if len(split_impalad) == 2:
|
||||
self.impalad_port = int(split_impalad[1])
|
||||
host, port = split_host_port(impalad)
|
||||
self.impalad_host = host
|
||||
self.impalad_port = port if port else 21000 # Default beeswax port
|
||||
self.imp_service = None
|
||||
self.transport = None
|
||||
self.use_kerberos = use_kerberos
|
||||
@@ -170,7 +169,7 @@ class ImpalaBeeswaxClient(object):
|
||||
|
||||
def close_connection(self):
|
||||
"""Close the transport if it's still open"""
|
||||
if self.transport:
|
||||
if self.transport and self.connected:
|
||||
self.transport.close()
|
||||
self.connected = False
|
||||
|
||||
|
||||
@@ -614,6 +614,7 @@ class CustomClusterTestSuite(ImpalaTestSuite):
|
||||
|
||||
# Failure tests expect cluster to be initialised even if start-impala-cluster fails.
|
||||
cls.cluster = ImpalaCluster.get_e2e_test_cluster()
|
||||
cls.impalad_test_service = cls.create_impala_service()
|
||||
|
||||
PREVIOUS_CMD_STR = cmd_str
|
||||
|
||||
@@ -650,3 +651,9 @@ class CustomClusterTestSuite(ImpalaTestSuite):
|
||||
for impalad in cls.cluster.impalads:
|
||||
impalad.service.wait_for_num_known_live_backends(expected_num_impalads,
|
||||
timeout=impalad_timeout_s)
|
||||
|
||||
@classmethod
|
||||
def create_impala_service(cls):
|
||||
"""Override ImpalaTestSuite to return 1st impalad of custom cluster.
|
||||
Returns None if no impalad was started."""
|
||||
return cls.cluster.impalads[0].service if cls.cluster.impalads else None
|
||||
|
||||
@@ -570,14 +570,21 @@ class BaseImpalaProcess(Process):
|
||||
|
||||
def _get_webserver_certificate_file(self):
|
||||
# TODO: if this is containerised, the path will likely not be the same on the host.
|
||||
# TODO: what we need in the client is the CA, not the server cert
|
||||
return self._get_arg_value("webserver_certificate_file", "")
|
||||
|
||||
def _get_ssl_client_ca_certificate(self):
|
||||
return self._get_arg_value("ssl_client_ca_certificate", "")
|
||||
|
||||
def _get_hostname(self):
|
||||
return self._get_arg_value("hostname", socket.gethostname())
|
||||
|
||||
def _get_webserver_interface(self):
|
||||
return self._get_arg_value("webserver_interface", socket.gethostname())
|
||||
|
||||
def _get_external_interface(self):
|
||||
return self._get_arg_value("external_interface", socket.gethostname())
|
||||
|
||||
def _get_arg_value(self, arg_name, default=None):
|
||||
"""Gets the argument value for given argument name"""
|
||||
for arg in self.cmd:
|
||||
@@ -600,10 +607,12 @@ class BaseImpalaProcess(Process):
|
||||
class ImpaladProcess(BaseImpalaProcess):
|
||||
def __init__(self, cmd, container_id=None, port_map=None):
|
||||
super(ImpaladProcess, self).__init__(cmd, container_id, port_map)
|
||||
self.external_interface = self._get_external_interface()
|
||||
self.service = ImpaladService(self.hostname, self.webserver_interface,
|
||||
self.external_interface,
|
||||
self.get_webserver_port(), self.__get_beeswax_port(),
|
||||
self.__get_krpc_port(), self.__get_hs2_port(), self.__get_hs2_http_port(),
|
||||
self._get_webserver_certificate_file())
|
||||
self._get_webserver_certificate_file(), self._get_ssl_client_ca_certificate())
|
||||
|
||||
def _get_default_webserver_port(self):
|
||||
return DEFAULT_IMPALAD_WEBSERVER_PORT
|
||||
@@ -685,7 +694,7 @@ class StateStoreProcess(BaseImpalaProcess):
|
||||
super(StateStoreProcess, self).__init__(cmd, container_id, port_map)
|
||||
self.service = StateStoredService(self.hostname, self.webserver_interface,
|
||||
self.get_webserver_port(), self._get_webserver_certificate_file(),
|
||||
self.__get_port())
|
||||
self._get_ssl_client_ca_certificate(), self.__get_port())
|
||||
|
||||
def _get_default_webserver_port(self):
|
||||
return DEFAULT_STATESTORED_WEBSERVER_PORT
|
||||
@@ -712,6 +721,7 @@ class CatalogdProcess(BaseImpalaProcess):
|
||||
super(CatalogdProcess, self).__init__(cmd, container_id, port_map)
|
||||
self.service = CatalogdService(self.hostname, self.webserver_interface,
|
||||
self.get_webserver_port(), self._get_webserver_certificate_file(),
|
||||
self._get_ssl_client_ca_certificate(),
|
||||
self.__get_port())
|
||||
|
||||
def _get_default_webserver_port(self):
|
||||
@@ -743,7 +753,8 @@ class AdmissiondProcess(BaseImpalaProcess):
|
||||
def __init__(self, cmd, container_id=None, port_map=None):
|
||||
super(AdmissiondProcess, self).__init__(cmd, container_id, port_map)
|
||||
self.service = AdmissiondService(self.hostname, self.webserver_interface,
|
||||
self.get_webserver_port(), self._get_webserver_certificate_file())
|
||||
self.get_webserver_port(), self._get_webserver_certificate_file(),
|
||||
self._get_ssl_client_ca_certificate())
|
||||
|
||||
def _get_default_webserver_port(self):
|
||||
return DEFAULT_ADMISSIOND_WEBSERVER_PORT
|
||||
|
||||
@@ -40,6 +40,7 @@ from tests.beeswax.impala_beeswax import (
|
||||
ImpalaBeeswaxException,
|
||||
)
|
||||
import tests.common
|
||||
from tests.common.network import split_host_port
|
||||
from tests.common.patterns import LOG_FORMAT
|
||||
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
|
||||
from tests.util.thrift_util import op_handle_to_query_id, session_handle_to_session_id
|
||||
@@ -610,13 +611,13 @@ class ImpylaHS2Connection(ImpalaConnection):
|
||||
return self.__cursor
|
||||
|
||||
def connect(self):
|
||||
host, port = self.__host_port.split(":")
|
||||
host, port = split_host_port(self.__host_port)
|
||||
conn_kwargs = {}
|
||||
if self._is_hive:
|
||||
conn_kwargs['auth_mechanism'] = 'PLAIN'
|
||||
try:
|
||||
self.__impyla_conn = impyla.connect(
|
||||
host=host, port=int(port), use_http_transport=self.__use_http_transport,
|
||||
host=host, port=port, use_http_transport=self.__use_http_transport,
|
||||
http_path=self.__http_path, use_ssl=self.__use_ssl, **conn_kwargs)
|
||||
self.log_client("connected to {0} with impyla {1}".format(
|
||||
self.__host_port, self.get_test_protocol()))
|
||||
|
||||
@@ -35,6 +35,7 @@ from thrift.transport.TSocket import TSocket
|
||||
from thrift.transport.TTransport import TBufferedTransport
|
||||
|
||||
from tests.common.impala_connection import create_connection, create_ldap_connection
|
||||
from tests.common.network import to_host_port, CERT_TO_CA_MAP
|
||||
from tests.common.test_vector import BEESWAX, HS2, HS2_HTTP
|
||||
|
||||
LOG = logging.getLogger('impala_service')
|
||||
@@ -48,7 +49,7 @@ WEBSERVER_PASSWORD = os.environ.get('IMPALA_WEBSERVER_PASSWORD', None)
|
||||
# TODO: Refactor the retry/timeout logic into a common place.
|
||||
class BaseImpalaService(object):
|
||||
def __init__(self, hostname, webserver_interface, webserver_port,
|
||||
webserver_certificate_file):
|
||||
webserver_certificate_file, ssl_client_ca_certificate_file):
|
||||
self.hostname = hostname
|
||||
self.webserver_interface = webserver_interface
|
||||
if webserver_interface == "":
|
||||
@@ -56,6 +57,7 @@ class BaseImpalaService(object):
|
||||
self.webserver_interface = hostname
|
||||
self.webserver_port = webserver_port
|
||||
self.webserver_certificate_file = webserver_certificate_file
|
||||
self.ssl_client_ca_certificate_file = ssl_client_ca_certificate_file
|
||||
self.webserver_username_password = None
|
||||
if WEBSERVER_USERNAME is not None and WEBSERVER_PASSWORD is not None:
|
||||
self.webserver_username_password = (WEBSERVER_USERNAME, WEBSERVER_PASSWORD)
|
||||
@@ -68,10 +70,14 @@ class BaseImpalaService(object):
|
||||
protocol = "http"
|
||||
if self.webserver_certificate_file != "":
|
||||
protocol = "https"
|
||||
url = "%s://%s:%d/%s" % \
|
||||
(protocol, self.webserver_interface, int(self.webserver_port), page_name)
|
||||
return requests.get(url, verify=self.webserver_certificate_file,
|
||||
auth=self.webserver_username_password)
|
||||
host_port = to_host_port(self.webserver_interface, self.webserver_port)
|
||||
url = "%s://%s/%s" % (protocol, host_port, page_name)
|
||||
cert = self.webserver_certificate_file
|
||||
# Instead of cert use its CA cert if available.
|
||||
file_part = cert.split("/")[-1]
|
||||
if file_part in CERT_TO_CA_MAP:
|
||||
cert = cert.replace(file_part, CERT_TO_CA_MAP[file_part])
|
||||
return requests.get(url, verify=cert, auth=self.webserver_username_password)
|
||||
except Exception as e:
|
||||
LOG.info("Debug webpage not yet available: %s", str(e))
|
||||
sleep(interval)
|
||||
@@ -260,11 +266,14 @@ class BaseImpalaService(object):
|
||||
# Allows for interacting with an Impalad instance to perform operations such as creating
|
||||
# new connections or accessing the debug webpage.
|
||||
class ImpaladService(BaseImpalaService):
|
||||
def __init__(self, hostname, webserver_interface="", webserver_port=25000,
|
||||
beeswax_port=21000, krpc_port=27000, hs2_port=21050,
|
||||
hs2_http_port=28000, webserver_certificate_file=""):
|
||||
def __init__(self, hostname, webserver_interface="", external_interface="",
|
||||
webserver_port=25000, beeswax_port=21000, krpc_port=27000, hs2_port=21050,
|
||||
hs2_http_port=28000, webserver_certificate_file="",
|
||||
ssl_client_ca_certificate_file=""):
|
||||
super(ImpaladService, self).__init__(
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file)
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
||||
ssl_client_ca_certificate_file)
|
||||
self.external_interface = external_interface if external_interface else hostname
|
||||
self.beeswax_port = beeswax_port
|
||||
self.krpc_port = krpc_port
|
||||
self.hs2_port = hs2_port
|
||||
@@ -444,30 +453,32 @@ class ImpaladService(BaseImpalaService):
|
||||
sleep(interval)
|
||||
return False
|
||||
|
||||
def is_port_open(self, port):
|
||||
def use_ssl_for_clients(self):
|
||||
return self.ssl_client_ca_certificate_file != ""
|
||||
|
||||
def is_port_open(self, host, port):
|
||||
try:
|
||||
sock = socket.create_connection((self.hostname, port), timeout=1)
|
||||
sock = socket.create_connection((host, port), timeout=1)
|
||||
sock.close()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def webserver_port_is_open(self):
|
||||
return self.is_port_open(self.webserver_port)
|
||||
return self.is_port_open(self.webserver_interface, self.webserver_port)
|
||||
|
||||
def create_beeswax_client(self, use_kerberos=False):
|
||||
"""Creates a new beeswax client connection to the impalad.
|
||||
DEPRECATED: Use create_hs2_client() instead."""
|
||||
LOG.warning('beeswax protocol is deprecated.')
|
||||
client = create_connection('%s:%d' % (self.hostname, self.beeswax_port),
|
||||
use_kerberos, BEESWAX)
|
||||
client = create_connection(to_host_port(self.external_interface, self.beeswax_port),
|
||||
use_kerberos, BEESWAX, use_ssl=self.use_ssl_for_clients())
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
def beeswax_port_is_open(self):
|
||||
"""Test if the beeswax port is open. Does not need to authenticate."""
|
||||
# Check if the port is open first to avoid chatty logging of Thrift connection.
|
||||
if not self.is_port_open(self.beeswax_port): return False
|
||||
if not self.is_port_open(self.external_interface, self.beeswax_port): return False
|
||||
|
||||
try:
|
||||
# The beeswax client will connect successfully even if not authenticated.
|
||||
@@ -475,31 +486,31 @@ class ImpaladService(BaseImpalaService):
|
||||
client.close()
|
||||
return True
|
||||
except Exception as e:
|
||||
LOG.info(e)
|
||||
return False
|
||||
|
||||
def create_ldap_beeswax_client(self, user, password, use_ssl=False):
|
||||
client = create_ldap_connection('%s:%d' % (self.hostname, self.beeswax_port),
|
||||
client = create_ldap_connection(to_host_port(self.hostname, self.beeswax_port),
|
||||
user=user, password=password, use_ssl=use_ssl)
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
def create_hs2_client(self, user=None):
|
||||
"""Creates a new HS2 client connection to the impalad"""
|
||||
client = create_connection('%s:%d' % (self.hostname, self.hs2_port),
|
||||
protocol=HS2, user=user)
|
||||
client = create_connection('%s:%d' % (self.external_interface, self.hs2_port),
|
||||
protocol=HS2, user=user,
|
||||
use_ssl=self.use_ssl_for_clients())
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
def hs2_port_is_open(self):
|
||||
"""Test if the HS2 port is open. Does not need to authenticate."""
|
||||
# Check if the port is open first to avoid chatty logging of Thrift connection.
|
||||
if not self.is_port_open(self.hs2_port): return False
|
||||
if not self.is_port_open(self.external_interface, self.hs2_port): return False
|
||||
|
||||
# Impyla will try to authenticate as part of connecting, so preserve previous logic
|
||||
# that uses the HS2 thrift code directly.
|
||||
try:
|
||||
sock = TSocket(self.hostname, self.hs2_port)
|
||||
sock = TSocket(self.external_interface, self.hs2_port)
|
||||
transport = TBufferedTransport(sock)
|
||||
transport.open()
|
||||
transport.close()
|
||||
@@ -510,7 +521,7 @@ class ImpaladService(BaseImpalaService):
|
||||
|
||||
def hs2_http_port_is_open(self):
|
||||
# Only check if the port is open, do not create Thrift transport.
|
||||
return self.is_port_open(self.hs2_http_port)
|
||||
return self.is_port_open(self.external_interface, self.hs2_http_port)
|
||||
|
||||
def create_client(self, protocol):
|
||||
"""Creates a new client connection for given protocol to this impalad"""
|
||||
@@ -520,7 +531,8 @@ class ImpaladService(BaseImpalaService):
|
||||
if protocol == BEESWAX:
|
||||
LOG.warning('beeswax protocol is deprecated.')
|
||||
port = self.beeswax_port
|
||||
client = create_connection('%s:%d' % (self.hostname, port), protocol=protocol)
|
||||
client = create_connection(to_host_port(self.external_interface, port),
|
||||
protocol=protocol)
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
@@ -537,9 +549,10 @@ class ImpaladService(BaseImpalaService):
|
||||
# accessing the debug webpage.
|
||||
class StateStoredService(BaseImpalaService):
|
||||
def __init__(self, hostname, webserver_interface, webserver_port,
|
||||
webserver_certificate_file, service_port):
|
||||
webserver_certificate_file, ssl_client_ca_certificate_file, service_port):
|
||||
super(StateStoredService, self).__init__(
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file)
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
||||
ssl_client_ca_certificate_file)
|
||||
self.service_port = service_port
|
||||
|
||||
def wait_for_live_subscribers(self, num_subscribers, timeout=15, interval=1):
|
||||
@@ -554,9 +567,10 @@ class StateStoredService(BaseImpalaService):
|
||||
# accessing the debug webpage.
|
||||
class CatalogdService(BaseImpalaService):
|
||||
def __init__(self, hostname, webserver_interface, webserver_port,
|
||||
webserver_certificate_file, service_port):
|
||||
webserver_certificate_file, ssl_client_ca_certificate_file, service_port):
|
||||
super(CatalogdService, self).__init__(
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file)
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
||||
ssl_client_ca_certificate_file)
|
||||
self.service_port = service_port
|
||||
|
||||
def get_catalog_version(self, timeout=10, interval=1):
|
||||
@@ -579,6 +593,7 @@ class CatalogdService(BaseImpalaService):
|
||||
|
||||
class AdmissiondService(BaseImpalaService):
|
||||
def __init__(self, hostname, webserver_interface, webserver_port,
|
||||
webserver_certificate_file):
|
||||
webserver_certificate_file, ssl_client_ca_certificate_file):
|
||||
super(AdmissiondService, self).__init__(
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file)
|
||||
hostname, webserver_interface, webserver_port, webserver_certificate_file,
|
||||
ssl_client_ca_certificate_file)
|
||||
|
||||
@@ -49,6 +49,7 @@ from tests.common.environ import (
|
||||
from tests.common.errors import Timeout
|
||||
from tests.common.impala_connection import create_connection
|
||||
from tests.common.impala_service import ImpaladService
|
||||
from tests.common.network import to_host_port
|
||||
from tests.common.test_dimensions import (
|
||||
ALL_BATCH_SIZES,
|
||||
ALL_DISABLE_CODEGEN_OPTIONS,
|
||||
@@ -265,7 +266,11 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
cls.hs2_http_client = None
|
||||
cls.hive_client = None
|
||||
cls.hive_transport = None
|
||||
|
||||
# In case of custom cluster tests this returns the 1st impalad or None if nothing
|
||||
# is started.
|
||||
cls.impalad_test_service = cls.create_impala_service()
|
||||
ImpalaTestSuite.impalad_test_service = cls.impalad_test_service
|
||||
|
||||
# Override the shell history path so that commands run by any tests
|
||||
# don't write any history into the developer's file.
|
||||
@@ -377,10 +382,12 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
if protocol is None:
|
||||
protocol = cls.default_test_protocol()
|
||||
if host_port is None:
|
||||
host_port = cls.__get_default_host_port(protocol)
|
||||
host = cls.impalad_test_service.external_interface
|
||||
host_port = to_host_port(host, cls._get_default_port(protocol))
|
||||
use_ssl = cls.impalad_test_service.use_ssl_for_clients()
|
||||
client = create_connection(
|
||||
host_port=host_port, use_kerberos=pytest.config.option.use_kerberos,
|
||||
protocol=protocol, is_hive=is_hive, user=user)
|
||||
protocol=protocol, is_hive=is_hive, user=user, use_ssl=use_ssl)
|
||||
client.connect()
|
||||
return client
|
||||
|
||||
@@ -413,7 +420,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
host, port = host_port.split(':')
|
||||
port = str(int(port) + nth)
|
||||
host_port = host + ':' + port
|
||||
return ImpalaTestSuite.create_impala_client(host_port, protocol=protocol)
|
||||
return cls.create_impala_client(host_port, protocol=protocol)
|
||||
|
||||
@classmethod
|
||||
def create_impala_clients(cls):
|
||||
@@ -448,6 +455,11 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
@classmethod
|
||||
def close_impala_clients(cls):
|
||||
"""Closes Impala clients created by create_impala_clients()."""
|
||||
# cls.client should be equal to one of belove, unless test method implicitly override.
|
||||
# Closing twice would lead to error in some clients (impyla+SSL).
|
||||
if cls.client not in (cls.beeswax_client, cls.hs2_client, cls.hs2_http_client):
|
||||
cls.client.close()
|
||||
cls.client = None
|
||||
if cls.beeswax_client:
|
||||
cls.beeswax_client.close()
|
||||
cls.beeswax_client = None
|
||||
@@ -457,11 +469,6 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
if cls.hs2_http_client:
|
||||
cls.hs2_http_client.close()
|
||||
cls.hs2_http_client = None
|
||||
# cls.client should be equal to one of above, unless test method implicitly override.
|
||||
# Closing twice should be OK.
|
||||
if cls.client:
|
||||
cls.client.close()
|
||||
cls.client = None
|
||||
|
||||
@classmethod
|
||||
def default_impala_client(cls, protocol):
|
||||
@@ -474,13 +481,13 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
raise Exception("unknown protocol: {0}".format(protocol))
|
||||
|
||||
@classmethod
|
||||
def __get_default_host_port(cls, protocol):
|
||||
def _get_default_port(cls, protocol):
|
||||
if protocol == BEESWAX:
|
||||
return IMPALAD
|
||||
return IMPALAD_BEESWAX_PORT
|
||||
elif protocol == HS2_HTTP:
|
||||
return IMPALAD_HS2_HTTP_HOST_PORT
|
||||
return IMPALAD_HS2_HTTP_PORT
|
||||
elif protocol == HS2:
|
||||
return IMPALAD_HS2_HOST_PORT
|
||||
return IMPALAD_HS2_PORT
|
||||
else:
|
||||
raise NotImplementedError("Not yet implemented: protocol=" + protocol)
|
||||
|
||||
@@ -497,13 +504,8 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
raise NotImplementedError("Not yet implemented: protocol=" + protocol)
|
||||
|
||||
@classmethod
|
||||
def create_impala_service(
|
||||
cls, host_port=IMPALAD, webserver_interface="", webserver_port=25000):
|
||||
host, port = host_port.split(':')
|
||||
if webserver_interface == "":
|
||||
webserver_interface = host
|
||||
return ImpaladService(host, beeswax_port=port,
|
||||
webserver_interface=webserver_interface, webserver_port=webserver_port)
|
||||
def create_impala_service(cls):
|
||||
return ImpaladService(IMPALAD_HOSTNAME)
|
||||
|
||||
@classmethod
|
||||
def create_hdfs_client(cls):
|
||||
@@ -773,7 +775,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
target_impalad_clients = list()
|
||||
if multiple_impalad:
|
||||
target_impalad_clients =\
|
||||
[ImpalaTestSuite.create_impala_client(host_port, protocol=protocol)
|
||||
[self.create_impala_client(host_port, protocol=protocol)
|
||||
for host_port in self.__get_cluster_host_ports(protocol)]
|
||||
else:
|
||||
target_impalad_clients = [self.default_impala_client(protocol)]
|
||||
@@ -822,7 +824,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
Helper to execute a query block in Hive. No special handling of query
|
||||
options is done, since we use a separate session for each block.
|
||||
"""
|
||||
h = ImpalaTestSuite.create_impala_client(HIVE_HS2_HOST_PORT, protocol=HS2,
|
||||
h = self.create_impala_client(HIVE_HS2_HOST_PORT, protocol=HS2,
|
||||
is_hive=True)
|
||||
try:
|
||||
result = None
|
||||
@@ -1788,3 +1790,25 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
break
|
||||
properties[fields[1].rstrip()] = fields[2].rstrip()
|
||||
return properties
|
||||
|
||||
# Checks if an Impala connection is functional.
|
||||
@staticmethod
|
||||
def check_connection(conn):
|
||||
res = conn.execute("select 1 + 1")
|
||||
assert res.data == ["2"]
|
||||
|
||||
# Checks connections for all protocols.
|
||||
def check_connections(cls, expected_count=3):
|
||||
# default client must exist
|
||||
cls.check_connection(cls.client)
|
||||
count = 0
|
||||
if cls.beeswax_client:
|
||||
cls.check_connection(cls.beeswax_client)
|
||||
count += 1
|
||||
if cls.hs2_client:
|
||||
cls.check_connection(cls.hs2_client)
|
||||
count += 1
|
||||
if cls.hs2_http_client:
|
||||
cls.check_connection(cls.hs2_http_client)
|
||||
count += 1
|
||||
assert count == expected_count
|
||||
|
||||
@@ -19,7 +19,9 @@
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
import socket
|
||||
import ssl
|
||||
|
||||
from tests.common.environ import IS_REDHAT_DERIVATIVE
|
||||
|
||||
# Retrieves the host external IP rather than localhost/127.0.0.1 so we have an IP that
|
||||
# Impala will consider distinct from storage backends to force remote scheduling.
|
||||
@@ -30,3 +32,49 @@ def get_external_ip():
|
||||
# Timeout=0 means it doesn't need to resolve.
|
||||
s.connect(('10.254.254.254', 1))
|
||||
return s.getsockname()[0]
|
||||
|
||||
|
||||
def split_host_port(host_port):
|
||||
"""Checks if the host name also contains a port and separates the two.
|
||||
Returns either (host, None) or (host, port). Detects if host is an ipv6 address
|
||||
like "[::]" and removes the brackets from it.
|
||||
"""
|
||||
is_ipv6_address = host_port[0] == "["
|
||||
if is_ipv6_address:
|
||||
parts = host_port[1:].split("]")
|
||||
if len(parts) == 1 or not parts[1]:
|
||||
return (parts[0], None)
|
||||
return (parts[0], int(parts[1][1:]))
|
||||
else:
|
||||
parts = host_port.split(":")
|
||||
if len(parts) == 1:
|
||||
return (parts[0], None)
|
||||
return (parts[0], int(parts[1]))
|
||||
|
||||
|
||||
def to_host_port(host, port):
|
||||
is_ipv6_address = ":" in host
|
||||
fmt = "[{0}]:{1}" if is_ipv6_address else "{0}:{1}"
|
||||
return fmt.format(host, port)
|
||||
|
||||
|
||||
CERT_TO_CA_MAP = {
|
||||
"wildcard-cert.pem": "wildcardCA.pem",
|
||||
"wildcard-san-cert.pem": "wildcardCA.pem"
|
||||
}
|
||||
|
||||
REQUIRED_MIN_OPENSSL_VERSION = 0x10001000
|
||||
# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5
|
||||
# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already
|
||||
if IS_REDHAT_DERIVATIVE:
|
||||
REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5)
|
||||
else:
|
||||
REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9)
|
||||
_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None)
|
||||
if _openssl_version_number is None:
|
||||
SKIP_SSL_MSG = "Legacy OpenSSL module detected"
|
||||
elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION:
|
||||
SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % (
|
||||
ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION)
|
||||
else:
|
||||
SKIP_SSL_MSG = None
|
||||
|
||||
@@ -24,34 +24,20 @@ import pytest
|
||||
import re
|
||||
import requests
|
||||
import signal
|
||||
import ssl
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
|
||||
from tests.common.environ import IS_REDHAT_DERIVATIVE
|
||||
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.impala_service import ImpaladService
|
||||
from tests.common.network import SKIP_SSL_MSG, REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12
|
||||
from tests.common.test_dimensions import create_client_protocol_dimension
|
||||
from tests.common.test_vector import BEESWAX
|
||||
from tests.shell.util import run_impala_shell_cmd, run_impala_shell_cmd_no_expect, \
|
||||
ImpalaShell, create_impala_shell_executable_dimension
|
||||
|
||||
REQUIRED_MIN_OPENSSL_VERSION = 0x10001000
|
||||
# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5
|
||||
# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already
|
||||
if IS_REDHAT_DERIVATIVE:
|
||||
REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5)
|
||||
else:
|
||||
REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9)
|
||||
_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None)
|
||||
if _openssl_version_number is None:
|
||||
SKIP_SSL_MSG = "Legacy OpenSSL module detected"
|
||||
elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION:
|
||||
SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % (
|
||||
ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION)
|
||||
else:
|
||||
SKIP_SSL_MSG = None
|
||||
|
||||
CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
|
||||
|
||||
|
||||
@@ -99,7 +85,6 @@ class TestClientSsl(CustomClusterTestSuite):
|
||||
@CustomClusterTestSuite.with_args(impalad_args=SSL_ARGS, statestored_args=SSL_ARGS,
|
||||
catalogd_args=SSL_ARGS)
|
||||
def test_ssl(self, vector):
|
||||
|
||||
self._verify_negative_cases(vector)
|
||||
# TODO: This is really two different tests, but the custom cluster takes too long to
|
||||
# start. Make it so that custom clusters can be specified across test suites.
|
||||
@@ -140,6 +125,7 @@ class TestClientSsl(CustomClusterTestSuite):
|
||||
print(result.stderr)
|
||||
assert "Query Status: Cancelled" in result.stdout
|
||||
assert impalad.wait_for_num_in_flight_queries(0)
|
||||
self.check_connections()
|
||||
|
||||
WEBSERVER_SSL_ARGS = ("--webserver_certificate_file=%(cert_dir)s/server-cert.pem "
|
||||
"--webserver_private_key_file=%(cert_dir)s/server-key.pem "
|
||||
@@ -217,6 +203,7 @@ class TestClientSsl(CustomClusterTestSuite):
|
||||
|
||||
self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR,
|
||||
host="ip4.impala.test")
|
||||
self.check_connections()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(impalad_args=SSL_WILDCARD_SAN_ARGS,
|
||||
@@ -225,7 +212,6 @@ class TestClientSsl(CustomClusterTestSuite):
|
||||
@pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG)
|
||||
def test_wildcard_san_ssl(self, vector):
|
||||
""" Test for IMPALA-3159: Test with a certificate which has a wildcard as a SAN. """
|
||||
|
||||
# This block of code is the same as _validate_positive_cases() but we want to check
|
||||
# if retrieving the SAN is supported first.
|
||||
args = ["--ssl", "-q", "select 1 + 2", "--ca_cert=%s/wildcardCA.pem" % CERT_DIR]
|
||||
@@ -239,6 +225,8 @@ class TestClientSsl(CustomClusterTestSuite):
|
||||
|
||||
self._validate_positive_cases(vector, "%s/wildcardCA.pem" % CERT_DIR,
|
||||
host="ip4.impala.test")
|
||||
self.check_connections()
|
||||
|
||||
|
||||
def _verify_negative_cases(self, vector, host=""):
|
||||
# Expect the shell to not start successfully if we point --ca_cert to an incorrect
|
||||
|
||||
@@ -352,18 +352,18 @@ class TestEventProcessingError(CustomClusterTestSuite):
|
||||
replication tests
|
||||
"""
|
||||
# inserts on transactional tables
|
||||
TestEventProcessingBase._run_test_insert_events_impl(unique_database, True)
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self, unique_database, True)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
try:
|
||||
test_db = unique_database + "_no_transact"
|
||||
self.run_stmt_in_hive("""create database {}""".format(test_db))
|
||||
# inserts on external tables
|
||||
TestEventProcessingBase._run_test_insert_events_impl(test_db, False)
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self, test_db, False)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
finally:
|
||||
self.run_stmt_in_hive("""drop database {} cascade""".format(test_db))
|
||||
# replication related tests
|
||||
TestEventProcessingBase._run_event_based_replication_tests_impl(
|
||||
TestEventProcessingBase._run_event_based_replication_tests_impl(self,
|
||||
self.filesystem_client)
|
||||
assert EventProcessorUtils.get_event_processor_status() == "ACTIVE"
|
||||
|
||||
|
||||
251
tests/custom_cluster/test_ipv6.py
Normal file
251
tests/custom_cluster/test_ipv6.py
Normal file
@@ -0,0 +1,251 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.network import SKIP_SSL_MSG
|
||||
from tests.common.test_dimensions import create_client_protocol_dimension
|
||||
from tests.shell.util import run_impala_shell_cmd, \
|
||||
create_impala_shell_executable_dimension
|
||||
from tests.common.impala_connection import create_connection
|
||||
|
||||
LOG = logging.getLogger('impala_test_suite')
|
||||
|
||||
CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
|
||||
|
||||
# Use wildcard san cert to be flexible about host name.
|
||||
SSL_WILDCARD_SAN_ARGS = ("--ssl_client_ca_certificate={0}/wildcardCA.pem "
|
||||
"--ssl_server_certificate={0}/wildcard-san-cert.pem "
|
||||
"--ssl_private_key={0}/wildcard-san-cert.key "
|
||||
"--hostname={1} "
|
||||
"--state_store_host={1} "
|
||||
"--catalog_service_host={1} "
|
||||
"--webserver_certificate_file={0}/wildcard-san-cert.pem "
|
||||
"--webserver_private_key_file={0}/wildcard-san-cert.key "
|
||||
).format(CERT_DIR, "ip4.impala.test")
|
||||
|
||||
WILDCARD_CA_CERT_PATH = "%s/wildcardCA.pem" % CERT_DIR
|
||||
|
||||
IPV6_ONLY_IP_WEBSERBER_ARG = "--webserver_interface=::1 "
|
||||
IPV6_DUAL_IP_WEBSERBER_ARG = "--webserver_interface=:: "
|
||||
IPV6_ONLY_IP_QUERY_ARG = "--external_interface=::1 "
|
||||
IPV6_DUAL_IP_QUERY_ARG = "--external_interface=:: "
|
||||
IPV6_ONLY_IP_COORDINATOR_ARG = \
|
||||
"%s %s" % (IPV6_ONLY_IP_WEBSERBER_ARG, IPV6_ONLY_IP_QUERY_ARG)
|
||||
IPV6_DUAL_IP_COORDINATOR_ARG = \
|
||||
"%s %s" % (IPV6_DUAL_IP_WEBSERBER_ARG, IPV6_DUAL_IP_QUERY_ARG)
|
||||
|
||||
IPV6_ONLY_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip6.impala.test "
|
||||
IPV6_DUAL_HOSTNAME_WEBSERBER_ARG = "--webserver_interface=ip46.impala.test "
|
||||
IPV6_ONLY_HOSTNAME_QUERY_ARG = "--external_interface=::1 "
|
||||
IPV6_DUAL_HOSTNAME_QUERY_ARG = "--external_interface=:: "
|
||||
|
||||
WEBUI_PORTS = [25000, 25010, 25020]
|
||||
|
||||
# Error text can depend on both protocol and python version.
|
||||
CONN_ERR = ["Could not connect", "Connection refused"]
|
||||
CERT_ERR = ["doesn't match", "certificate verify failed"]
|
||||
WEB_CERT_ERR = "CertificateError"
|
||||
|
||||
|
||||
class TestIPv6Base(CustomClusterTestSuite):
|
||||
ca_cert = None
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestIPv6Base, cls).setup_class()
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestIPv6Base, cls).add_test_dimensions()
|
||||
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
|
||||
cls.ImpalaTestMatrix.add_dimension(create_impala_shell_executable_dimension())
|
||||
|
||||
def _smoke(self, host, vector, expected_errors=[]):
|
||||
proto = vector.get_value('protocol')
|
||||
try:
|
||||
port = self._get_default_port(proto)
|
||||
host_port = "%s:%d" % (host, port)
|
||||
use_ssl = self.ca_cert is not None
|
||||
conn = create_connection(host_port, protocol=proto, use_ssl=use_ssl)
|
||||
conn.connect()
|
||||
assert not expected_errors
|
||||
res = conn.execute("select 1")
|
||||
assert res.data == ["1"]
|
||||
except Exception as ex:
|
||||
for err in expected_errors:
|
||||
if err in str(ex): return
|
||||
raise ex
|
||||
|
||||
def _webui_smoke(self, url, err=None):
|
||||
"""Tests to check glibc version and locale is available"""
|
||||
try:
|
||||
if self.ca_cert:
|
||||
other_info_page = requests.get(url + "/?json", verify=self.ca_cert).text
|
||||
else:
|
||||
other_info_page = requests.get(url + "/?json", verify=False).text
|
||||
assert err is None
|
||||
other_info = json.loads(other_info_page)
|
||||
assert "glibc_version" in other_info
|
||||
except Exception as ex:
|
||||
if not err: raise ex
|
||||
assert err in str(ex)
|
||||
|
||||
def _shell_smoke(self, host, vector, expected_errors=[]):
|
||||
proto = vector.get_value('protocol')
|
||||
port = self._get_default_port(proto)
|
||||
host_port = "%s:%d" % (host, port)
|
||||
try:
|
||||
shell_options = ["-i", host_port, "-q", "select 1"]
|
||||
if self.ca_cert:
|
||||
shell_options.append("--ssl")
|
||||
shell_options.append("--ca_cert=" + self.ca_cert)
|
||||
result = run_impala_shell_cmd(vector, shell_options)
|
||||
assert not expected_errors
|
||||
assert "Fetched 1 row" in result.stderr
|
||||
except Exception as ex:
|
||||
for err in expected_errors:
|
||||
if err in str(ex): return
|
||||
raise ex
|
||||
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args=IPV6_DUAL_IP_WEBSERBER_ARG
|
||||
+ IPV6_DUAL_IP_QUERY_ARG,
|
||||
statestored_args=IPV6_DUAL_IP_WEBSERBER_ARG,
|
||||
catalogd_args=IPV6_DUAL_IP_WEBSERBER_ARG)
|
||||
class TestIPv6DualNoSsl(TestIPv6Base):
|
||||
|
||||
def test_ipv6_dual_no_ssl(self, vector):
|
||||
for port in WEBUI_PORTS:
|
||||
self._webui_smoke("http://127.0.0.1:%d" % port)
|
||||
self._webui_smoke("http://[::1]:%d" % port)
|
||||
self._webui_smoke("http://ip4.impala.test:%d" % port)
|
||||
self._webui_smoke("http://ip6.impala.test:%d" % port)
|
||||
self._webui_smoke("http://ip46.impala.test:%d" % port)
|
||||
|
||||
self._smoke("[::1]", vector)
|
||||
self._smoke("127.0.0.1", vector)
|
||||
self._smoke("ip4.impala.test", vector)
|
||||
self._smoke("ip6.impala.test", vector)
|
||||
self._smoke("ip46.impala.test", vector)
|
||||
|
||||
self._shell_smoke("[::1]", vector)
|
||||
self._shell_smoke("127.0.0.1", vector)
|
||||
self._shell_smoke("ip4.impala.test", vector)
|
||||
self._shell_smoke("ip6.impala.test", vector)
|
||||
self._shell_smoke("ip46.impala.test", vector)
|
||||
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args=IPV6_ONLY_IP_WEBSERBER_ARG
|
||||
+ IPV6_ONLY_IP_QUERY_ARG,
|
||||
statestored_args=IPV6_ONLY_IP_WEBSERBER_ARG,
|
||||
catalogd_args=IPV6_ONLY_IP_WEBSERBER_ARG)
|
||||
class TestIPv6OnlyNoSsl(TestIPv6Base):
|
||||
|
||||
def test_ipv6_only_no_ssl(self, vector):
|
||||
self.check_connections()
|
||||
for port in WEBUI_PORTS:
|
||||
self._webui_smoke("http://127.0.0.1:%d" % port, err="Connection refused")
|
||||
self._webui_smoke("http://[::1]:%d" % port)
|
||||
self._webui_smoke("http://ip4.impala.test:%d" % port, err="Connection refused")
|
||||
self._webui_smoke("http://ip6.impala.test:%d" % port)
|
||||
self._webui_smoke("http://ip46.impala.test:%d" % port)
|
||||
|
||||
self._smoke("[::1]", vector)
|
||||
self._smoke("127.0.0.1", vector, CONN_ERR)
|
||||
self._smoke("ip4.impala.test", vector, CONN_ERR)
|
||||
self._smoke("ip6.impala.test", vector)
|
||||
self._smoke("ip46.impala.test", vector)
|
||||
|
||||
self._shell_smoke("[::1]", vector)
|
||||
self._shell_smoke("127.0.0.1", vector, CONN_ERR)
|
||||
self._shell_smoke("ip4.impala.test", vector, CONN_ERR)
|
||||
self._shell_smoke("ip6.impala.test", vector)
|
||||
self._shell_smoke("ip46.impala.test", vector)
|
||||
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG
|
||||
+ IPV6_DUAL_HOSTNAME_QUERY_ARG
|
||||
+ SSL_WILDCARD_SAN_ARGS,
|
||||
statestored_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG
|
||||
+ SSL_WILDCARD_SAN_ARGS,
|
||||
catalogd_args=IPV6_DUAL_HOSTNAME_WEBSERBER_ARG
|
||||
+ SSL_WILDCARD_SAN_ARGS)
|
||||
class TestIPv6DualSsl(TestIPv6Base):
|
||||
ca_cert = WILDCARD_CA_CERT_PATH
|
||||
|
||||
@pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG)
|
||||
def test_ipv6_dual_ssl(self, vector):
|
||||
self.check_connections()
|
||||
for port in WEBUI_PORTS:
|
||||
self._webui_smoke("https://127.0.0.1:%d" % port, WEB_CERT_ERR)
|
||||
self._webui_smoke("https://[::1]:%d" % port, WEB_CERT_ERR)
|
||||
self._webui_smoke("https://ip4.impala.test:%d" % port)
|
||||
self._webui_smoke("https://ip6.impala.test:%d" % port)
|
||||
self._webui_smoke("https://ip46.impala.test:%d" % port)
|
||||
|
||||
self._smoke("[::1]", vector, CONN_ERR)
|
||||
self._smoke("127.0.0.1", vector, CONN_ERR)
|
||||
self._smoke("ip4.impala.test", vector)
|
||||
self._smoke("ip6.impala.test", vector)
|
||||
self._smoke("ip46.impala.test", vector)
|
||||
|
||||
self._shell_smoke("[::1]", vector, CERT_ERR)
|
||||
self._shell_smoke("127.0.0.1", vector, CERT_ERR)
|
||||
self._shell_smoke("ip4.impala.test", vector)
|
||||
self._shell_smoke("ip6.impala.test", vector)
|
||||
self._shell_smoke("ip46.impala.test", vector)
|
||||
|
||||
|
||||
@CustomClusterTestSuite.with_args(impalad_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG
|
||||
+ IPV6_ONLY_HOSTNAME_QUERY_ARG
|
||||
+ SSL_WILDCARD_SAN_ARGS,
|
||||
statestored_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG
|
||||
+ SSL_WILDCARD_SAN_ARGS,
|
||||
catalogd_args=IPV6_ONLY_HOSTNAME_WEBSERBER_ARG
|
||||
+ SSL_WILDCARD_SAN_ARGS)
|
||||
class TestIPv6OnlySsl(TestIPv6Base):
|
||||
ca_cert = WILDCARD_CA_CERT_PATH
|
||||
|
||||
@pytest.mark.skipif(SKIP_SSL_MSG is not None, reason=SKIP_SSL_MSG)
|
||||
def test_ipv6_only_ssl(self, vector):
|
||||
self.check_connections()
|
||||
for port in WEBUI_PORTS:
|
||||
self._webui_smoke("https://127.0.0.1:%d" % port, WEB_CERT_ERR)
|
||||
self._webui_smoke("https://[::1]:%d" % port, WEB_CERT_ERR)
|
||||
self._webui_smoke("https://ip4.impala.test:%d" % port, "Connection refused")
|
||||
self._webui_smoke("https://ip6.impala.test:%d" % port)
|
||||
self._webui_smoke("https://ip46.impala.test:%d" % port)
|
||||
|
||||
self._smoke("[::1]", vector, CONN_ERR)
|
||||
self._smoke("127.0.0.1", vector, CONN_ERR)
|
||||
self._smoke("ip4.impala.test", vector, CONN_ERR)
|
||||
self._smoke("ip6.impala.test", vector)
|
||||
self._smoke("ip46.impala.test", vector)
|
||||
|
||||
self._shell_smoke("[::1]", vector, CERT_ERR)
|
||||
self._shell_smoke("127.0.0.1", vector, CONN_ERR)
|
||||
self._shell_smoke("ip4.impala.test", vector, CONN_ERR)
|
||||
self._shell_smoke("ip6.impala.test", vector)
|
||||
self._shell_smoke("ip46.impala.test", vector)
|
||||
@@ -39,6 +39,10 @@ class TestRedaction(CustomClusterTestSuite):
|
||||
limited to table data and query text since queries may refer to table data.
|
||||
'''
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestRedaction, cls).setup_class()
|
||||
|
||||
@property
|
||||
def log_dir(self):
|
||||
return os.path.join(self.tmp_dir, "logs")
|
||||
|
||||
@@ -35,10 +35,14 @@ PROCESSING_TIMEOUT_S = 10
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@SkipIfFS.hive
|
||||
class TestEventProcessing(ImpalaTestSuite):
|
||||
class TestEventProcessing(TestEventProcessingBase):
|
||||
"""This class contains tests that exercise the event processing mechanism in the
|
||||
catalog."""
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestEventProcessing, cls).setup_class()
|
||||
|
||||
@classmethod
|
||||
def default_test_protocol(cls):
|
||||
return HS2
|
||||
@@ -47,13 +51,13 @@ class TestEventProcessing(ImpalaTestSuite):
|
||||
def test_transactional_insert_events(self, unique_database):
|
||||
"""Executes 'run_test_insert_events' for transactional tables.
|
||||
"""
|
||||
TestEventProcessingBase._run_test_insert_events_impl(
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self,
|
||||
unique_database, is_transactional=True)
|
||||
|
||||
def test_insert_events(self, unique_database):
|
||||
"""Executes 'run_test_insert_events' for non-transactional tables.
|
||||
"""
|
||||
TestEventProcessingBase._run_test_insert_events_impl(unique_database)
|
||||
TestEventProcessingBase._run_test_insert_events_impl(self, unique_database)
|
||||
|
||||
def test_iceberg_inserts(self):
|
||||
"""IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation
|
||||
@@ -99,13 +103,12 @@ class TestEventProcessing(ImpalaTestSuite):
|
||||
self._run_test_empty_partition_events(unique_database, False)
|
||||
|
||||
def test_event_based_replication(self):
|
||||
TestEventProcessingBase._run_event_based_replication_tests_impl(
|
||||
self._run_event_based_replication_tests_impl(self,
|
||||
self.filesystem_client)
|
||||
|
||||
def _run_test_empty_partition_events(self, unique_database, is_transactional):
|
||||
test_tbl = unique_database + ".test_events"
|
||||
TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties(
|
||||
is_transactional)
|
||||
TBLPROPERTIES = self._get_transactional_tblproperties(is_transactional)
|
||||
self.run_stmt_in_hive("create table {0} (key string, value string) \
|
||||
partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES))
|
||||
self.client.set_configuration({
|
||||
|
||||
@@ -27,17 +27,22 @@ EVENT_SYNC_QUERY_OPTIONS = {
|
||||
class TestEventProcessingBase(ImpalaTestSuite):
|
||||
|
||||
@classmethod
|
||||
def _run_test_insert_events_impl(cls, unique_database, is_transactional=False):
|
||||
def setup_class(cls):
|
||||
super(TestEventProcessingBase, cls).setup_class()
|
||||
|
||||
@classmethod
|
||||
def _run_test_insert_events_impl(cls, suite, unique_database, is_transactional=False):
|
||||
"""Test for insert event processing. Events are created in Hive and processed in
|
||||
Impala. The following cases are tested :
|
||||
Insert into table --> for partitioned and non-partitioned table
|
||||
Insert overwrite table --> for partitioned and non-partitioned table
|
||||
Insert into partition --> for partitioned table
|
||||
"""
|
||||
with cls.create_impala_client() as impala_client:
|
||||
# TODO: change into an instance method and remove argument "suite" (IMPALA-14174)
|
||||
with suite.create_impala_client() as impala_client:
|
||||
# Test table with no partitions.
|
||||
tbl_insert_nopart = 'tbl_insert_nopart'
|
||||
cls.run_stmt_in_hive(
|
||||
suite.run_stmt_in_hive(
|
||||
"drop table if exists %s.%s" % (unique_database, tbl_insert_nopart))
|
||||
tblproperties = ""
|
||||
if is_transactional:
|
||||
@@ -141,16 +146,18 @@ class TestEventProcessingBase(ImpalaTestSuite):
|
||||
assert len(result.data) == 0
|
||||
|
||||
@classmethod
|
||||
def _run_event_based_replication_tests_impl(cls, filesystem_client, transactional=True):
|
||||
def _run_event_based_replication_tests_impl(cls, suite,
|
||||
filesystem_client, transactional=True):
|
||||
"""Hive Replication relies on the insert events generated on the tables.
|
||||
This test issues some basic replication commands from Hive and makes sure
|
||||
that the replicated table has correct data."""
|
||||
# TODO: change into an instance method and remove argument "suite" (IMPALA-14174)
|
||||
TBLPROPERTIES = cls._get_transactional_tblproperties(transactional)
|
||||
source_db = ImpalaTestSuite.get_random_name("repl_source_")
|
||||
target_db = ImpalaTestSuite.get_random_name("repl_target_")
|
||||
unpartitioned_tbl = "unpart_tbl"
|
||||
partitioned_tbl = "part_tbl"
|
||||
impala_client = cls.create_impala_client()
|
||||
impala_client = suite.create_impala_client()
|
||||
try:
|
||||
cls.run_stmt_in_hive("create database {0}".format(source_db))
|
||||
cls.run_stmt_in_hive(
|
||||
|
||||
@@ -93,7 +93,7 @@ class TestAcidInsertsBasic(TestAcidStress):
|
||||
def _impala_role_write_inserts(self, tbl_name, partitioned):
|
||||
"""INSERT INTO/OVERWRITE a table several times from Impala."""
|
||||
try:
|
||||
impalad_client = ImpalaTestSuite.create_impala_client()
|
||||
impalad_client = self.create_impala_client()
|
||||
part_expr = "partition (p=1)" if partitioned else ""
|
||||
for run in range(0, NUM_OVERWRITES + 1):
|
||||
OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i)
|
||||
@@ -109,7 +109,7 @@ class TestAcidInsertsBasic(TestAcidStress):
|
||||
def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds):
|
||||
"""SELECT from a table many times until the expected final values are found."""
|
||||
try:
|
||||
impalad_client = ImpalaTestSuite.create_impala_client()
|
||||
impalad_client = self.create_impala_client()
|
||||
expected_result = {"run": -1, "i": 0}
|
||||
accept_empty_table = True
|
||||
while expected_result["run"] != NUM_OVERWRITES and \
|
||||
@@ -182,7 +182,7 @@ class TestAcidInsertsBasic(TestAcidStress):
|
||||
def _impala_role_partition_writer(self, tbl_name, partition, is_overwrite, sleep_sec):
|
||||
insert_op = "OVERWRITE" if is_overwrite else "INTO"
|
||||
try:
|
||||
impalad_client = ImpalaTestSuite.create_impala_client()
|
||||
impalad_client = self.create_impala_client()
|
||||
impalad_client.execute(
|
||||
"""insert {op} table {tbl_name} partition({partition})
|
||||
select sleep({sleep_ms})""".format(op=insert_op, tbl_name=tbl_name,
|
||||
|
||||
@@ -36,6 +36,10 @@ class TestInsertStress(ImpalaTestSuite):
|
||||
def get_workload(self):
|
||||
return 'targeted-stress'
|
||||
|
||||
@classmethod
|
||||
def setup_class(cls):
|
||||
super(TestInsertStress, cls).setup_class()
|
||||
|
||||
@classmethod
|
||||
def add_test_dimensions(cls):
|
||||
super(TestInsertStress, cls).add_test_dimensions()
|
||||
@@ -46,8 +50,8 @@ class TestInsertStress(ImpalaTestSuite):
|
||||
def _impala_role_concurrent_writer(self, tbl_name, wid, num_inserts, counter):
|
||||
"""Writes ascending numbers up to 'num_inserts' into column 'i'. To column 'wid' it
|
||||
writes its identifier passed in parameter 'wid'."""
|
||||
target_impalad = wid % ImpalaTestSuite.get_impalad_cluster_size()
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = wid % self.get_impalad_cluster_size()
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
try:
|
||||
insert_cnt = 0
|
||||
while insert_cnt < num_inserts:
|
||||
@@ -72,8 +76,8 @@ class TestInsertStress(ImpalaTestSuite):
|
||||
assert sorted_run == list(range(sorted_run[0], sorted_run[-1] + 1)), \
|
||||
"wid: %d" % wid
|
||||
|
||||
target_impalad = cid % ImpalaTestSuite.get_impalad_cluster_size()
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = cid % self.get_impalad_cluster_size()
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
try:
|
||||
while counter.value != writers:
|
||||
result = impalad_client.execute("select * from %s" % tbl_name)
|
||||
|
||||
@@ -41,8 +41,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite):
|
||||
|
||||
def _impala_role_concurrent_updater(self, tbl_name, col, num_writes):
|
||||
"""Increments values in column 'total' and in the column which is passed in 'col'."""
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
merge_stmt = """merge into {0} target using
|
||||
{0} source on source.total = target.total
|
||||
when matched then update set
|
||||
@@ -61,8 +61,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite):
|
||||
|
||||
def _impala_role_concurrent_writer(self, tbl_name, num_inserts):
|
||||
"""Adds a new row based on the maximum 'total' value."""
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
merge_stmt = """merge into {0} target using
|
||||
(select total, a, b, c from {0} order by total desc limit 1) source
|
||||
on source.total +1 = target.total
|
||||
@@ -92,8 +92,8 @@ class TestIcebergConcurrentMergeStress(ImpalaTestSuite):
|
||||
assert total == a + b + c
|
||||
return max_total
|
||||
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
total = 0
|
||||
while total < target_total:
|
||||
result = impalad_client.execute("select * from %s" % tbl_name)
|
||||
|
||||
@@ -82,8 +82,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
|
||||
|
||||
def _impala_role_concurrent_writer(self, tbl_name, col, num_updates):
|
||||
"""Increments values in column 'total' and in the column which is passed in 'col'."""
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
update_cnt = 0
|
||||
while update_cnt < num_updates:
|
||||
try:
|
||||
@@ -107,8 +107,8 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
|
||||
assert total == a + b + c
|
||||
return total
|
||||
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
total = 0
|
||||
while total < target_total:
|
||||
result = impalad_client.execute("select * from %s" % tbl_name)
|
||||
@@ -163,8 +163,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
|
||||
|
||||
def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, num_rows):
|
||||
"""Deletes every row from the table one by one."""
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
impalad_client.set_configuration_option("SYNC_DDL", "true")
|
||||
i = 0
|
||||
while i < num_rows:
|
||||
@@ -181,8 +181,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
|
||||
|
||||
def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted):
|
||||
"""Updates every row in the table in a loop."""
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
impalad_client.set_configuration_option("SYNC_DDL", "true")
|
||||
while all_rows_deleted.value != 1:
|
||||
try:
|
||||
@@ -196,8 +196,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
|
||||
|
||||
def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted):
|
||||
"""Optimizes the table in a loop."""
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
impalad_client.set_configuration_option("SYNC_DDL", "true")
|
||||
while all_rows_deleted.value != 1:
|
||||
try:
|
||||
@@ -253,8 +253,8 @@ class TestIcebergConcurrentOperations(ImpalaTestSuite):
|
||||
prev_j = j
|
||||
assert prev_id == num_rows - 1
|
||||
|
||||
target_impalad = random.randint(0, ImpalaTestSuite.get_impalad_cluster_size() - 1)
|
||||
impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
|
||||
target_impalad = random.randint(0, self.get_impalad_cluster_size() - 1)
|
||||
impalad_client = self.create_client_for_nth_impalad(target_impalad)
|
||||
while all_rows_deleted.value != 1:
|
||||
result = impalad_client.execute("select * from %s order by id" % tbl_name)
|
||||
verify_result_set(result)
|
||||
|
||||
Reference in New Issue
Block a user