mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-6948,IMPALA-6962: add end-to-end tests
Adds end-to-end tests to validate that following various metadata operations, the catalog state in catalogd and impalads is the same. For IMPALA-6962, catalogd process restart for tests is fixed. Change-Id: Ic6c5b39e29b2885cd30fede18833cbf23fb755f5 Reviewed-on: http://gerrit.cloudera.org:8080/10291 Reviewed-by: Alex Behm <alex.behm@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
6a86c8e8d4
commit
28c1f76529
@@ -265,7 +265,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
|
||||
[[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
|
||||
while (1) {
|
||||
unique_lock<mutex> unique_lock(catalog_lock_);
|
||||
// Protect against spurious wakups by checking the value of topic_updates_ready_.
|
||||
// Protect against spurious wake-ups by checking the value of topic_updates_ready_.
|
||||
// It is only safe to continue on and update the shared pending_topic_updates_
|
||||
// when topic_updates_ready_ is false, otherwise we may be in the middle of
|
||||
// processing a heartbeat.
|
||||
@@ -311,6 +311,16 @@ void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
|
||||
document->AddMember("error", error, document->GetAllocator());
|
||||
return;
|
||||
}
|
||||
long current_catalog_version;
|
||||
status = catalog_->GetCatalogVersion(¤t_catalog_version);
|
||||
if (status.ok()) {
|
||||
Value version_value;
|
||||
version_value.SetInt(current_catalog_version);
|
||||
document->AddMember("version", version_value, document->GetAllocator());
|
||||
} else {
|
||||
Value error(status.GetDetail().c_str(), document->GetAllocator());
|
||||
document->AddMember("versionError", error, document->GetAllocator());
|
||||
}
|
||||
Value databases(kArrayType);
|
||||
for (const TDatabase& db: get_dbs_result.dbs) {
|
||||
Value database(kObjectType);
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
# Basic object model of an Impala cluster (set of Impala processes).
|
||||
#
|
||||
import logging
|
||||
import os
|
||||
import psutil
|
||||
import socket
|
||||
from getpass import getuser
|
||||
@@ -35,6 +36,9 @@ logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
|
||||
LOG = logging.getLogger('impala_cluster')
|
||||
LOG.setLevel(level=logging.DEBUG)
|
||||
|
||||
IMPALA_HOME = os.environ['IMPALA_HOME']
|
||||
CATALOGD_PATH = os.path.join(IMPALA_HOME, 'bin/start-catalogd.sh')
|
||||
|
||||
# Represents a set of Impala processes. Each Impala process must be created with
|
||||
# a basic set of command line options (beeswax_port, webserver_port, etc)
|
||||
class ImpalaCluster(object):
|
||||
@@ -252,3 +256,12 @@ class CatalogdProcess(BaseImpalaProcess):
|
||||
|
||||
def __get_port(self, default=None):
|
||||
return int(self._get_arg_value('catalog_service_port', default))
|
||||
|
||||
def start(self, wait_until_ready=True):
|
||||
"""Starts catalogd and waits until the service is ready to accept connections."""
|
||||
restart_cmd = [CATALOGD_PATH] + self.cmd[1:] + ["&"]
|
||||
LOG.info("Starting Catalogd process: %s" % ' '.join(restart_cmd))
|
||||
os.system(' '.join(restart_cmd))
|
||||
if wait_until_ready:
|
||||
self.service.wait_for_metric_value('statestore-subscriber.connected',
|
||||
expected_value=1, timeout=30)
|
||||
|
||||
@@ -134,6 +134,35 @@ class BaseImpalaService(object):
|
||||
json.dumps(self.read_debug_webpage('threadz?json')),
|
||||
json.dumps(self.read_debug_webpage('rpcz?json')))
|
||||
|
||||
def get_catalog_object_dump(self, object_type, object_name):
|
||||
""" Gets the web-page for the given 'object_type' and 'object_name'."""
|
||||
return self.read_debug_webpage('catalog_object?object_type=%s&object_name=%s' %\
|
||||
(object_type, object_name))
|
||||
|
||||
def get_catalog_objects(self, excludes=['_impala_builtins']):
|
||||
""" Returns a dictionary containing all catalog objects. Each entry's key is the fully
|
||||
qualified object name and the value is a tuple of the form (type, version).
|
||||
Does not return databases listed in the 'excludes' list."""
|
||||
catalog = self.get_debug_webpage_json('catalog')
|
||||
objects = {}
|
||||
for db_desc in catalog["databases"]:
|
||||
db_name = db_desc["name"]
|
||||
if db_name in excludes:
|
||||
continue
|
||||
db = self.get_catalog_object_dump('DATABASE', db_name)
|
||||
objects[db_name] = ('DATABASE', self.extract_catalog_object_version(db))
|
||||
for table_desc in db_desc["tables"]:
|
||||
table_name = table_desc["fqtn"]
|
||||
table = self.get_catalog_object_dump('TABLE', table_name)
|
||||
objects[table_name] = ('TABLE', self.extract_catalog_object_version(table))
|
||||
return objects
|
||||
|
||||
def extract_catalog_object_version(self, thrift_txt):
|
||||
""" Extracts and returns the version of the catalog object's 'thrift_txt' representation."""
|
||||
result = re.search(r'catalog_version \(i64\) = (\d+)', thrift_txt)
|
||||
assert result, 'Unable to find catalog version in object: ' + thrift_txt
|
||||
return int(result.group(1))
|
||||
|
||||
# Allows for interacting with an Impalad instance to perform operations such as creating
|
||||
# new connections or accessing the debug webpage.
|
||||
class ImpaladService(BaseImpalaService):
|
||||
@@ -260,10 +289,6 @@ class ImpaladService(BaseImpalaService):
|
||||
hs2_client = TCLIService.Client(protocol)
|
||||
return hs2_client
|
||||
|
||||
def get_catalog_object_dump(self, object_type, object_name):
|
||||
return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
|
||||
(object_type, object_name))
|
||||
|
||||
|
||||
# Allows for interacting with the StateStore service to perform operations such as
|
||||
# accessing the debug webpage.
|
||||
@@ -283,6 +308,6 @@ class CatalogdService(BaseImpalaService):
|
||||
super(CatalogdService, self).__init__(hostname, webserver_port)
|
||||
self.service_port = service_port
|
||||
|
||||
def get_catalog_object_dump(self, object_type, object_name):
|
||||
return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
|
||||
(object_type, object_name))
|
||||
def get_catalog_version(self):
|
||||
""" Gets catalogd's latest catalog version. """
|
||||
return self.get_debug_webpage_json('catalog')["version"]
|
||||
|
||||
107
tests/custom_cluster/test_metadata_replicas.py
Normal file
107
tests/custom_cluster/test_metadata_replicas.py
Normal file
@@ -0,0 +1,107 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import pytest
|
||||
import re
|
||||
from time import sleep
|
||||
from tests.common.environ import specific_build_type_timeout
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.util.hive_utils import HiveDbWrapper
|
||||
|
||||
class TestMetadataReplicas(CustomClusterTestSuite):
|
||||
""" Validates metadata content across catalogd and impalad coordinators."""
|
||||
|
||||
@classmethod
|
||||
def get_workload(cls):
|
||||
return 'functional-query'
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_start(self):
|
||||
""" Baseline to verify that the initial state is identical. No DDL/DML
|
||||
is processed, so no objects are fully loaded."""
|
||||
self.__validate_metadata()
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_catalog_restart(self, testid_checksum):
|
||||
""" IMPALA-6948: reproduces the issue by deleting a table from Hive while the catalogd
|
||||
is down. When catalogd is restarted, if the regression is present, the deleted
|
||||
table will still be present at the impalads."""
|
||||
db_name = "test_catalog_restart_%s" % testid_checksum
|
||||
try:
|
||||
with HiveDbWrapper(self, db_name):
|
||||
# Issue several invalidates to boost the version for the current incarnation of the
|
||||
# catalog. As a result, the table we'll add to Hive will get a version that's easier
|
||||
# to see is higher than the highest version of the restarted catalogd incarnation.
|
||||
for i in xrange(0, 50):
|
||||
self.client.execute("invalidate metadata functional.alltypes")
|
||||
assert self.cluster.catalogd.service.get_catalog_version() >= 50
|
||||
# Creates a database and table with Hive and makes it visible to Impala.
|
||||
self.run_stmt_in_hive("create table %s.x (a string)" % db_name)
|
||||
self.client.execute("invalidate metadata %s.x" % db_name)
|
||||
assert "x" in self.client.execute("show tables in %s" % db_name).data
|
||||
# Stops the catalog
|
||||
self.cluster.catalogd.kill()
|
||||
# Drops the table from the catalog using Hive.
|
||||
self.run_stmt_in_hive("drop table %s.x" % db_name)
|
||||
# Restarts the catalog
|
||||
self.cluster.catalogd.start()
|
||||
# Refreshes the state of the catalogd process.
|
||||
self.cluster.refresh()
|
||||
# Wait until the impalad catalog versions agree with the catalogd's version.
|
||||
catalogd_version = self.cluster.catalogd.service.get_catalog_version()
|
||||
for impalad in self.cluster.impalads:
|
||||
impalad.service.wait_for_metric_value("catalog.curr-version", catalogd_version)
|
||||
|
||||
self.__validate_metadata()
|
||||
except Exception as e:
|
||||
assert False, "Unexpected exception: " + str(e)
|
||||
finally:
|
||||
# Hack to work-around IMPALA-5695.
|
||||
self.cluster.catalogd.kill()
|
||||
|
||||
def __validate_metadata(self):
|
||||
""" Computes the pair-wise object version difference between the catalog contents
|
||||
in catalogd and each impalad. Asserts that there are no differences."""
|
||||
c_objects = self.cluster.catalogd.service.get_catalog_objects()
|
||||
i_objects = [proc.service.get_catalog_objects() for proc in self.cluster.impalads]
|
||||
|
||||
for idx in xrange(0, len(i_objects)):
|
||||
i_obj = i_objects[idx]
|
||||
diff = self.__diff_catalog_objects(c_objects, i_obj)
|
||||
assert diff[0] == {},\
|
||||
'catalogd has objects not in impalad(%d): %s ' % (idx, diff[0])
|
||||
assert diff[1] == {}, 'impalad(%d) has objects not in catalogd: %s' % (idx, diff[1])
|
||||
assert diff[2] is None,\
|
||||
'impalad(%d) and catalogd version for objects differs: %s' % (idx, diff[2])
|
||||
|
||||
def __diff_catalog_objects(self, a, b):
|
||||
""" Computes the diff between the input 'a' and 'b' dictionaries. The result is a
|
||||
list of length 3 where position 0 holds those entries that are in a, but not b,
|
||||
position 1 those entries that are in b, but not a, and position 2 holds entries
|
||||
where the key is in both a and b, but whose value differs."""
|
||||
# diff[0] : a - b
|
||||
# diff[1] : b - a
|
||||
# diff[2] : a[k] != b[k]
|
||||
diff = [None, None, None]
|
||||
diff[0] = dict((k, a[k]) for k in set(a) - set(b))
|
||||
diff[1] = dict((k, b[k]) for k in set(b) - set(a))
|
||||
for k, v_a in a.items():
|
||||
v_b = b[k]
|
||||
if v_b is not None:
|
||||
if v_b != v_a:
|
||||
diff[2][k] = (v_a, v_b)
|
||||
return diff
|
||||
@@ -34,6 +34,7 @@ from tests.common.skip import SkipIfS3, SkipIfADLS, SkipIfIsilon, SkipIfLocal
|
||||
from tests.common.test_dimensions import (
|
||||
create_single_exec_option_dimension,
|
||||
create_uncompressed_text_dimension)
|
||||
from tests.util.hive_utils import HiveDbWrapper, HiveTableWrapper
|
||||
|
||||
import logging
|
||||
|
||||
@@ -147,45 +148,6 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
def __exit__(self, typ, value, traceback):
|
||||
self.impala.client.execute('drop table if exists %s' % self.table_name)
|
||||
|
||||
class HiveDbWrapper(object):
|
||||
"""
|
||||
A wrapper class for using `with` guards with databases created through Hive
|
||||
ensuring deletion even if an exception occurs.
|
||||
"""
|
||||
|
||||
def __init__(self, hive, db_name):
|
||||
self.hive = hive
|
||||
self.db_name = db_name
|
||||
|
||||
def __enter__(self):
|
||||
self.hive.run_stmt_in_hive(
|
||||
'create database if not exists ' + self.db_name)
|
||||
return self.db_name
|
||||
|
||||
def __exit__(self, typ, value, traceback):
|
||||
self.hive.run_stmt_in_hive(
|
||||
'drop database if exists %s cascade' % self.db_name)
|
||||
|
||||
class HiveTableWrapper(object):
|
||||
"""
|
||||
A wrapper class for using `with` guards with tables created through Hive
|
||||
ensuring deletion even if an exception occurs.
|
||||
"""
|
||||
|
||||
def __init__(self, hive, table_name, table_spec):
|
||||
self.hive = hive
|
||||
self.table_name = table_name
|
||||
self.table_spec = table_spec
|
||||
|
||||
def __enter__(self):
|
||||
self.hive.run_stmt_in_hive(
|
||||
'create table if not exists %s %s' %
|
||||
(self.table_name, self.table_spec))
|
||||
return self.table_name
|
||||
|
||||
def __exit__(self, typ, value, traceback):
|
||||
self.hive.run_stmt_in_hive('drop table if exists %s' % self.table_name)
|
||||
|
||||
def impala_table_stats(self, table):
|
||||
"""Returns a dictionary of stats for a table according to Impala."""
|
||||
output = self.client.execute('show table stats %s' % table).get_data()
|
||||
@@ -288,13 +250,11 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_hive_db_hive_table_add_partition(self, vector):
|
||||
self.add_hive_partition_helper(vector, self.HiveDbWrapper,
|
||||
self.HiveTableWrapper)
|
||||
self.add_hive_partition_helper(vector, HiveDbWrapper, HiveTableWrapper)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_hive_db_impala_table_add_partition(self, vector):
|
||||
self.add_hive_partition_helper(vector, self.HiveDbWrapper,
|
||||
self.ImpalaTableWrapper)
|
||||
self.add_hive_partition_helper(vector, HiveDbWrapper, self.ImpalaTableWrapper)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
def test_impala_db_impala_table_add_partition(self, vector):
|
||||
@@ -304,7 +264,7 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
@pytest.mark.execute_serially
|
||||
def test_impala_db_hive_table_add_partition(self, vector):
|
||||
self.add_hive_partition_helper(vector, self.ImpalaDbWrapper,
|
||||
self.HiveTableWrapper)
|
||||
HiveTableWrapper)
|
||||
|
||||
@pytest.mark.xfail(run=False, reason="This is a bug: IMPALA-2426")
|
||||
@pytest.mark.execute_serially
|
||||
@@ -493,9 +453,9 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
@pytest.mark.execute_serially
|
||||
def test_compute_stats_get_to_impala(self, vector):
|
||||
"""Column stats computed in Hive are also visible in Impala."""
|
||||
with self.HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int)') as table_name:
|
||||
with HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int)') as table_name:
|
||||
hive_stats = self.hive_column_stats(table_name, 'x')
|
||||
self.client.execute('invalidate metadata')
|
||||
self.client.execute('refresh %s' % table_name)
|
||||
@@ -577,7 +537,7 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
"""
|
||||
|
||||
test_db = self.unique_string()
|
||||
with self.HiveDbWrapper(self, test_db) as db_name:
|
||||
with HiveDbWrapper(self, test_db) as db_name:
|
||||
pass
|
||||
self.assert_sql_error(
|
||||
self.client.execute,
|
||||
@@ -596,9 +556,9 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
"""
|
||||
# TODO: check results of insert, then select * before and after
|
||||
# storage format change.
|
||||
with self.HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int) stored as parquet') as table_name:
|
||||
with HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int) stored as parquet') as table_name:
|
||||
self.client.execute('invalidate metadata')
|
||||
self.client.execute('invalidate metadata %s' % table_name)
|
||||
print self.impala_table_stats(table_name)
|
||||
@@ -612,9 +572,9 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
def test_change_column_type(self, vector):
|
||||
"""Hive column type changes propagate to Impala."""
|
||||
|
||||
with self.HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int)') as table_name:
|
||||
with HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int)') as table_name:
|
||||
self.run_stmt_in_hive(
|
||||
'insert into table %s values (33,44)' % table_name)
|
||||
self.run_stmt_in_hive('alter table %s change y y string' % table_name)
|
||||
@@ -633,9 +593,9 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
known issue with changing column types in Hive/parquet.
|
||||
"""
|
||||
|
||||
with self.HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int) stored as parquet') as table_name:
|
||||
with HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int) stored as parquet') as table_name:
|
||||
self.run_stmt_in_hive(
|
||||
'insert into table %s values (33,44)' % table_name)
|
||||
assert '33,44' == self.run_stmt_in_hive(
|
||||
@@ -661,9 +621,9 @@ class TestHmsIntegration(ImpalaTestSuite):
|
||||
metadata'.
|
||||
"""
|
||||
|
||||
with self.HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with self.HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int)') as table_name:
|
||||
with HiveDbWrapper(self, self.unique_string()) as db_name:
|
||||
with HiveTableWrapper(self, db_name + '.' + self.unique_string(),
|
||||
'(x int, y int)') as table_name:
|
||||
self.client.execute('invalidate metadata')
|
||||
int_column = {'type': 'int', 'comment': ''}
|
||||
expected_columns = {'x': int_column, 'y': int_column}
|
||||
|
||||
59
tests/util/hive_utils.py
Normal file
59
tests/util/hive_utils.py
Normal file
@@ -0,0 +1,59 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
# Utilities for interacting with Hive.
|
||||
|
||||
|
||||
class HiveDbWrapper(object):
|
||||
"""
|
||||
A wrapper class for using `with` guards with databases created through Hive
|
||||
ensuring deletion even if an exception occurs.
|
||||
"""
|
||||
|
||||
def __init__(self, hive, db_name):
|
||||
self.hive = hive
|
||||
self.db_name = db_name
|
||||
|
||||
def __enter__(self):
|
||||
self.hive.run_stmt_in_hive(
|
||||
'create database if not exists ' + self.db_name)
|
||||
return self.db_name
|
||||
|
||||
def __exit__(self, typ, value, traceback):
|
||||
self.hive.run_stmt_in_hive(
|
||||
'drop database if exists %s cascade' % self.db_name)
|
||||
|
||||
|
||||
class HiveTableWrapper(object):
|
||||
"""
|
||||
A wrapper class for using `with` guards with tables created through Hive
|
||||
ensuring deletion even if an exception occurs.
|
||||
"""
|
||||
|
||||
def __init__(self, hive, table_name, table_spec):
|
||||
self.hive = hive
|
||||
self.table_name = table_name
|
||||
self.table_spec = table_spec
|
||||
|
||||
def __enter__(self):
|
||||
self.hive.run_stmt_in_hive(
|
||||
'create table if not exists %s %s' %
|
||||
(self.table_name, self.table_spec))
|
||||
return self.table_name
|
||||
|
||||
def __exit__(self, typ, value, traceback):
|
||||
self.hive.run_stmt_in_hive('drop table if exists %s' % self.table_name)
|
||||
Reference in New Issue
Block a user