mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-14074: Warmup metadata cache in catalogd for critical tables
*Background* Catalogd starts with a cold metadata cache - only the db/table names and functions are loaded. Metadata of a table is unloaded until there are queries submitted on the table. The first query will suffer from the delay of loading metadata. There is a flag, --load_catalog_in_background, to let catalogd eagerly load metadata of all tables even if no queries come. Catalogd may load metadata for tables that are possibly never used, potentially increasing catalog size and consequently memory usage. So this flag is turned off by default and not recommended to be used in production. Users do need the metadata of some critical tables to be loaded. Before that the service is considered not ready since important queries might fail in timeout. When Catalogd HA is enabled, it’s also required that the standby catalogd has an up-to-date metadata cache to smoothly take over the active one when failover happens. *New Flags* This patch adds a startup flag for catalogd to specify a config file containing tables that users want their metadata to be loaded. Catalogd adds them to the table loading queue in background when a catalog reset happens, i.e. at catalogd startup or global INVALIDATE METADATA runs. The flag is --warmup_tables_config_file. The value can be a path in the local FS or in remote storage (e.g. HDFS). E.g. --warmup_tables_config_file=file:///opt/impala/warmup_table_list.txt --warmup_tables_config_file=hdfs:///tmp/warmup_table_list.txt Each line in the config file can be a fully qualified table name or a wildcard under a db, e.g. "tpch.*". Catalogd loads the table names at startup and schedules loading on them after a reset of the catalog. The scheduling order is based on the order in the config file. So important tables can be put first. Comments start with "#" or "//" are ignored in the config file. Another flag, --keeps_warmup_tables_loaded (defaults to false), is added to control whether to reload the table after it’s been invalidated, either by an explicit INVALIDATE METADATA <table> command or implicitly invalidated by CatalogdTableInvalidator or HMS RELOAD events. When CatalogdTableInvalidator is enabled with --invalidate_tables_on_memory_pressure=true, users shouldn’t set keeps_warmup_tables_loaded to true if the catalogd heap size is not enough to cache metadata of all these tables. Otherwise, these tables will keep being loaded and invalidated. *Catalogd HA Changes* When Catalogd HA is enabled, the standby catalogd will also reset its catalog and start loading metadata of these tables, after the HA state (active/standby) is determined. Standby catalogd keeps its metadata cache up-to-date by applying HMS notification events. To support a warmed up switch, --catalogd_ha_reset_metadata_on_failover should be set to false. *Limitation* The standby catalogd could still have a stale cache if there are operations in the active catalogd that don’t trigger HMS notification events, or if the HMS notification event is not applied correctly. E.g. Adding a new native function generates an ALTER_DATABASE event, but when applying the event, native function list of the db is not refreshed (IMPALA-14210). These will be resolved in separate JIRAs. *Test* - Added FE unit tests. - Added e2e test for local/hdfs config files. - Added e2e test to verify the standby catalogd has a warmed up cache when failover happens. Change-Id: I2d09eae1f12a8acd2de945984d956d11eeee1ab6 Reviewed-on: http://gerrit.cloudera.org:8080/23155 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
3b0d6277d5
commit
da190f1d86
@@ -291,6 +291,17 @@ DEFINE_int32(catalog_reset_max_threads, 10,
|
||||
"catalog reset.");
|
||||
DEFINE_validator(catalog_reset_max_threads, gt_0);
|
||||
|
||||
DEFINE_string(warmup_tables_config_file, "",
|
||||
"Path to the configuration file listing tables to warm up, i.e. loading metadata, "
|
||||
"when catalogd starts or when a global INVALIDATE METADATA finishes.");
|
||||
|
||||
DEFINE_bool(keeps_warmup_tables_loaded, false,
|
||||
"If set to true, catalogd will keep metadata of tables in the warmup list always "
|
||||
"loaded even if they are invalidated. Don't set this to true if the catalogd heap "
|
||||
"size is not enough to cache metadata of all these tables and "
|
||||
"--invalidate_tables_on_memory_pressure is turned on. Otherwise, these tables will "
|
||||
"keep being loaded and invalidated.");
|
||||
|
||||
DECLARE_string(state_store_host);
|
||||
DECLARE_int32(state_store_port);
|
||||
DECLARE_string(state_store_2_host);
|
||||
@@ -853,7 +864,7 @@ void CatalogServer::UpdateActiveCatalogd(bool is_registration_reply,
|
||||
while (!must_reset) {
|
||||
catalog_update_cv_.NotifyOne();
|
||||
catalog_update_cv_.Wait(unique_lock);
|
||||
must_reset = is_active_ && !triggered_first_reset_;
|
||||
must_reset = is_ha_determined_ && !triggered_first_reset_;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -145,6 +145,8 @@ DECLARE_int32(max_outstanding_events_on_executors);
|
||||
DECLARE_bool(consolidate_grant_revoke_requests);
|
||||
DECLARE_int32(reset_metadata_lock_duration_ms);
|
||||
DECLARE_int32(catalog_reset_max_threads);
|
||||
DECLARE_string(warmup_tables_config_file);
|
||||
DECLARE_bool(keeps_warmup_tables_loaded);
|
||||
|
||||
// HS2 SAML2.0 configuration
|
||||
// Defined here because TAG_FLAG caused issues in global-flags.cc
|
||||
@@ -548,6 +550,8 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
|
||||
cfg.__set_iceberg_catalog_num_threads(FLAGS_iceberg_catalog_num_threads);
|
||||
cfg.__set_reset_metadata_lock_duration_ms(FLAGS_reset_metadata_lock_duration_ms);
|
||||
cfg.__set_catalog_reset_max_threads(FLAGS_catalog_reset_max_threads);
|
||||
cfg.__set_warmup_tables_config_file(FLAGS_warmup_tables_config_file);
|
||||
cfg.__set_keeps_warmup_tables_loaded(FLAGS_keeps_warmup_tables_loaded);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -173,6 +173,7 @@ testdata/data/json_test/*
|
||||
testdata/data/sfs_d2.txt
|
||||
testdata/data/sfs_d4.txt
|
||||
testdata/data/load_data_with_catalog_v1.txt
|
||||
testdata/data/warmup_table_list.txt
|
||||
testdata/datasets/functional/functional_schema_template.sql
|
||||
testdata/impala-profiles/README
|
||||
testdata/impala-profiles/impala_profile_log_tpcds_compute_stats
|
||||
|
||||
@@ -345,4 +345,8 @@ struct TBackendGflags {
|
||||
156: required i32 reset_metadata_lock_duration_ms
|
||||
|
||||
157: required i32 catalog_reset_max_threads
|
||||
|
||||
158: required string warmup_tables_config_file
|
||||
|
||||
159: required bool keeps_warmup_tables_loaded
|
||||
}
|
||||
|
||||
@@ -46,7 +46,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@@ -67,7 +66,6 @@ import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
|
||||
import org.apache.hadoop.hive.common.ValidTxnList;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.api.Database;
|
||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.TableMeta;
|
||||
@@ -308,6 +306,16 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
|
||||
private final boolean loadInBackground_;
|
||||
|
||||
// Tables to load metadata in background when catalogd starts or global INVALIDATE
|
||||
// METADATA runs. Supports wildcard to specify all tables under a db. Tables will be
|
||||
// loaded in the same order as in the config file.
|
||||
private final Set<TTableName> warmupTables_;
|
||||
|
||||
// If true, warmup tables will be loaded in background after being explicitly
|
||||
// invalidated by commands or implicitly invalidated by CatalogdTableInvalidator or
|
||||
// HMS RELOAD events.
|
||||
private final boolean keepsWarmupTablesLoaded_;
|
||||
|
||||
// Periodically polls HDFS to get the latest set of known cache pools.
|
||||
private final ScheduledExecutorService cachePoolReader_ =
|
||||
Executors.newScheduledThreadPool(1,
|
||||
@@ -465,6 +473,10 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
commonHmsEventTypes_.add(eventType);
|
||||
}
|
||||
LOG.info("Common HMS event types: " + commonHmsEventTypes_);
|
||||
keepsWarmupTablesLoaded_ = BackendConfig.INSTANCE.keepsWarmupTablesLoaded();
|
||||
warmupTables_ = FileSystemUtil.loadWarmupTableNames(
|
||||
BackendConfig.INSTANCE.getWarmupTablesConfigFile());
|
||||
LOG.info("Loaded {} table names to warmup", warmupTables_.size());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -2446,7 +2458,7 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
catalogTimeline.markEvent("Got database list");
|
||||
}
|
||||
rebuildDbCache(allHmsDbs, unlockedTimer, catalogTimeline, isSyncDdl);
|
||||
|
||||
scheduleWarmupTables();
|
||||
catalogTimeline.markEvent("Updated catalog cache");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error initializing Catalog", e);
|
||||
@@ -2581,6 +2593,31 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
Preconditions.checkState(versionLock_.writeLock().isHeldByCurrentThread());
|
||||
}
|
||||
|
||||
private void scheduleWarmupTables() {
|
||||
Preconditions.checkState(versionLock_.writeLock().isHeldByCurrentThread());
|
||||
// if loadInBackground_ is true, tables are already be loading.
|
||||
if (loadInBackground_) return;
|
||||
int cnt = 0;
|
||||
for (TTableName tTblName : warmupTables_) {
|
||||
if (tTblName.table_name.equals("*")) {
|
||||
String dbName = tTblName.db_name;
|
||||
Db db = getDb(dbName);
|
||||
if (db == null) continue;
|
||||
for (String tbl : db.getAllTableNames()) {
|
||||
tableLoadingMgr_.backgroundLoad(new TTableName(dbName, tbl));
|
||||
cnt++;
|
||||
LOG.info("Scheduled warmup on table {}.{} based on wildcard", dbName, tbl);
|
||||
}
|
||||
} else {
|
||||
tableLoadingMgr_.backgroundLoad(tTblName);
|
||||
cnt++;
|
||||
LOG.info("Scheduled warmup on table {}.{}", tTblName.db_name,
|
||||
tTblName.table_name);
|
||||
}
|
||||
}
|
||||
LOG.info("Scheduled {} tables to be warmed up", cnt);
|
||||
}
|
||||
|
||||
public Db addDb(String dbName, org.apache.hadoop.hive.metastore.api.Database msDb) {
|
||||
return addDb(dbName, msDb, -1);
|
||||
}
|
||||
@@ -3310,10 +3347,7 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
MetastoreShim.mapToInternalTableType(tblMeta.getTableType()),
|
||||
tblMeta.getComments(), eventId);
|
||||
Preconditions.checkNotNull(newTable);
|
||||
if (loadInBackground_) {
|
||||
tableLoadingMgr_.backgroundLoad(new TTableName(dbName.toLowerCase(),
|
||||
tblName.toLowerCase()));
|
||||
}
|
||||
scheduleTableLoading(dbName, tblName);
|
||||
if (dbWasAdded.getRef()) {
|
||||
// The database should always have a lower catalog version than the table because
|
||||
// it needs to be created before the table can be added.
|
||||
@@ -3347,13 +3381,20 @@ public class CatalogServiceCatalog extends Catalog {
|
||||
} finally {
|
||||
versionLock_.writeLock().unlock();
|
||||
}
|
||||
if (loadInBackground_) {
|
||||
tableLoadingMgr_.backgroundLoad(
|
||||
new TTableName(dbName.toLowerCase(), tblName.toLowerCase()));
|
||||
}
|
||||
scheduleTableLoading(dbName, tblName);
|
||||
return incompleteTable;
|
||||
}
|
||||
|
||||
private void scheduleTableLoading(String dbName, String tableName) {
|
||||
TTableName tTblName = new TTableName(dbName.toLowerCase(), tableName.toLowerCase());
|
||||
TTableName tDbName = new TTableName(dbName.toLowerCase(), "*");
|
||||
if (loadInBackground_
|
||||
|| (keepsWarmupTablesLoaded_
|
||||
&& (warmupTables_.contains(tTblName)) || warmupTables_.contains(tDbName))) {
|
||||
tableLoadingMgr_.backgroundLoad(tTblName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh table if exists. Returns true if reloadTable() succeeds, false
|
||||
* otherwise.
|
||||
|
||||
@@ -43,17 +43,21 @@ import org.apache.hadoop.ozone.client.ObjectStore;
|
||||
import org.apache.hadoop.ozone.client.OzoneBucket;
|
||||
import org.apache.hadoop.ozone.client.OzoneVolume;
|
||||
import org.apache.impala.catalog.HdfsCompression;
|
||||
import org.apache.impala.common.Pair;
|
||||
import org.apache.impala.service.BackendConfig;
|
||||
import org.apache.impala.thrift.TTableName;
|
||||
import org.apache.impala.util.DebugUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
@@ -1263,4 +1267,31 @@ public class FileSystemUtil {
|
||||
throw new NoSuchElementException("No more entries");
|
||||
}
|
||||
}
|
||||
|
||||
public static Set<TTableName> loadWarmupTableNames(String configFile) {
|
||||
if (configFile == null || configFile.isEmpty()) return Collections.emptySet();
|
||||
// Use LinkedHashSet to keep the order in config file.
|
||||
Set<TTableName> warmupTables = new LinkedHashSet<>();
|
||||
Path path = new Path(configFile);
|
||||
try {
|
||||
FileSystem fs = path.getFileSystem(new Configuration());
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
line = line.trim();
|
||||
// These won't match any table names but skip them to save some warnings.
|
||||
if (line.isEmpty() || line.startsWith("#") || line.startsWith("//")) continue;
|
||||
String[] names = line.split("\\.");
|
||||
if (names.length != 2) {
|
||||
LOG.warn("Skipped illegal table name in warmup list: " + line);
|
||||
continue;
|
||||
}
|
||||
warmupTables.add(new TTableName(names[0].toLowerCase(), names[1].toLowerCase()));
|
||||
}
|
||||
return warmupTables;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to load the list of warmup tables", e);
|
||||
}
|
||||
return Collections.emptySet();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -589,4 +589,12 @@ public class BackendConfig {
|
||||
public int getCatalogResetMaxThreads() {
|
||||
return backendCfg_.catalog_reset_max_threads;
|
||||
}
|
||||
|
||||
public String getWarmupTablesConfigFile() {
|
||||
return backendCfg_.warmup_tables_config_file;
|
||||
}
|
||||
|
||||
public boolean keepsWarmupTablesLoaded() {
|
||||
return backendCfg_.keeps_warmup_tables_loaded;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertEquals;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.impala.thrift.TTableName;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@@ -35,6 +36,7 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Tests for the various util methods in FileSystemUtil class
|
||||
@@ -231,6 +233,22 @@ public class FileSystemUtilTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoadingWarmupTableNames() {
|
||||
String configFile = String.format(
|
||||
"file://%s/testdata/data/warmup_table_list.txt", System.getenv("IMPALA_HOME"));
|
||||
Set<TTableName> tableNames = FileSystemUtil.loadWarmupTableNames(configFile);
|
||||
String db = "tpcds";
|
||||
String[] tables = {"customer", "date_dim", "item", "store_sales"};
|
||||
for (String table : tables) {
|
||||
assertTrue(table + " not found", tableNames.contains(new TTableName(db, table)));
|
||||
}
|
||||
assertTrue(tableNames.contains(new TTableName("tpch", "*")));
|
||||
assertTrue(tableNames.contains(new TTableName("functional", "#")));
|
||||
assertTrue(tableNames.contains(new TTableName("functional", "alltypes etc #")));
|
||||
assertEquals(tables.length + 3, tableNames.size());
|
||||
}
|
||||
|
||||
private boolean testIsInIgnoredDirectory(Path input) {
|
||||
return testIsInIgnoredDirectory(input, true);
|
||||
}
|
||||
|
||||
3
testdata/bin/create-load-data.sh
vendored
3
testdata/bin/create-load-data.sh
vendored
@@ -226,6 +226,9 @@ function load-custom-schemas {
|
||||
# File used by CreateTableLikeOrc tests
|
||||
ln -s ${IMPALA_HOME}/testdata/data/alltypes_non_acid.orc ${SCHEMA_TMP_DIR}
|
||||
|
||||
ln -s ${IMPALA_HOME}/testdata/data/warmup_table_list.txt\
|
||||
${TMP_DIR}/warmup_table_list.txt
|
||||
|
||||
hadoop fs -put -f ${TMP_DIR}/* /test-warehouse
|
||||
|
||||
rm -r ${TMP_DIR}
|
||||
|
||||
24
testdata/data/warmup_table_list.txt
vendored
Normal file
24
testdata/data/warmup_table_list.txt
vendored
Normal file
@@ -0,0 +1,24 @@
|
||||
# Tables are scheduled in the order they apprear in this file.
|
||||
# Put the important tables first.
|
||||
tpcds.store_sales
|
||||
|
||||
# Wildcard can be used to match all tables under a db.
|
||||
tpch.*
|
||||
|
||||
tpcds.customer
|
||||
tpcds.date_dim
|
||||
|
||||
# Table names will be converted to lowercase
|
||||
tpcds.Item
|
||||
|
||||
# Comments are ignored
|
||||
#tpcds.store_returns
|
||||
//tpcds.store_
|
||||
|
||||
# Invalid entries are also ignored
|
||||
tpcds
|
||||
tpch.
|
||||
|
||||
# These will be treated as table names but ignored in loading since they don't exist
|
||||
functional.#
|
||||
functional.alltypes etc #
|
||||
@@ -590,6 +590,26 @@ class CatalogdService(BaseImpalaService):
|
||||
def get_catalog_service_port(self):
|
||||
return self.service_port
|
||||
|
||||
def verify_table_metadata_loaded(self, db, table, expect_loaded=True, timeout_s=10):
|
||||
page_url = "table_metrics?json&name=%s.%s" % (db, table)
|
||||
start_time = time()
|
||||
while (time() - start_time < timeout_s):
|
||||
response = self.open_debug_webpage(page_url)
|
||||
assert response.status_code == requests.codes.ok
|
||||
response_json = json.loads(response.text)
|
||||
assert "table_metrics" in response_json
|
||||
table_metrics = response_json["table_metrics"]
|
||||
if expect_loaded:
|
||||
if "Table not yet loaded" not in table_metrics:
|
||||
return
|
||||
LOG.info("Table {0}.{1} is not loaded".format(db, table))
|
||||
else:
|
||||
if "Table not yet loaded" in table_metrics:
|
||||
return
|
||||
LOG.info("Table {0}.{1} is loaded".format(db, table))
|
||||
sleep(1)
|
||||
assert False, "Timeout waiting table {0}.{1} to be {2}".format(
|
||||
db, table, "loaded" if expect_loaded else "not loaded")
|
||||
|
||||
class AdmissiondService(BaseImpalaService):
|
||||
def __init__(self, hostname, webserver_interface, webserver_port,
|
||||
|
||||
@@ -1626,7 +1626,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
pattern = re.compile(line_regex, re.DOTALL)
|
||||
|
||||
for i in range(0, timeout_s):
|
||||
log_file_path = self.__build_log_path(daemon, level)
|
||||
log_file_path = self.build_log_path(daemon, level)
|
||||
with open(log_file_path) as log:
|
||||
ret = pattern.search(log.read())
|
||||
if ret is not None:
|
||||
@@ -1664,7 +1664,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
while True:
|
||||
try:
|
||||
found = 0
|
||||
log_file_path = self.__build_log_path(daemon, level)
|
||||
log_file_path = self.build_log_path(daemon, level)
|
||||
last_re_result = None
|
||||
with open(log_file_path, 'rb') as log_file:
|
||||
for line in log_file:
|
||||
@@ -1722,7 +1722,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
pytest.fail("{}[{}]={} does not match against dimension {}={}.".format(
|
||||
EXEC_OPTION, name, exec_option[name], name, vector.get_value(name)))
|
||||
|
||||
def __build_log_path(self, daemon, level):
|
||||
def build_log_path(self, daemon, level):
|
||||
"""Builds a path to a log file for a particular daemon. Does not assert that file
|
||||
actually exists.
|
||||
|
||||
@@ -1754,7 +1754,7 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
|
||||
Return: nothing
|
||||
"""
|
||||
actual_log_path = self.__build_log_path(daemon, level)
|
||||
actual_log_path = self.build_log_path(daemon, level)
|
||||
|
||||
def exists_func():
|
||||
return os.path.exists(actual_log_path)
|
||||
|
||||
@@ -28,7 +28,7 @@ from tests.common.environ import build_flavor_timeout
|
||||
from tests.common.impala_connection import ERROR
|
||||
from tests.common.parametrize import UniqueDatabase
|
||||
from tests.common.test_vector import HS2
|
||||
from tests.util.filesystem_utils import IS_S3, get_fs_path
|
||||
from tests.util.filesystem_utils import IS_S3, get_fs_path, FILESYSTEM_PREFIX
|
||||
from time import sleep
|
||||
|
||||
LOG = logging.getLogger('catalogd_ha_test')
|
||||
@@ -525,7 +525,31 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
catalogd_args="--catalogd_ha_reset_metadata_on_failover=true",
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_metadata_after_failover(self, unique_database):
|
||||
"""Verify that the metadata is correct after failover."""
|
||||
self._test_metadata_after_failover(unique_database)
|
||||
|
||||
@CustomClusterTestSuite.with_args(
|
||||
statestored_args="--use_subscriber_id_as_catalogd_priority=true",
|
||||
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
|
||||
"--warmup_tables_config_file="
|
||||
"{0}/test-warehouse/warmup_table_list.txt".format(FILESYSTEM_PREFIX),
|
||||
start_args="--enable_catalogd_ha")
|
||||
def test_warmed_up_metadata_after_failover(self, unique_database):
|
||||
"""Verify that the metadata is warmed up in the standby catalogd."""
|
||||
for catalogd in self.__get_catalogds():
|
||||
self._test_warmed_up_tables(catalogd.service)
|
||||
latest_catalogd = self._test_metadata_after_failover(unique_database, True)
|
||||
self._test_warmed_up_tables(latest_catalogd)
|
||||
|
||||
def _test_warmed_up_tables(self, catalogd):
|
||||
db = "tpcds"
|
||||
tables = ["customer", "date_dim", "item", "store_sales"]
|
||||
for table in tables:
|
||||
catalogd.verify_table_metadata_loaded(db, table)
|
||||
catalogd.verify_table_metadata_loaded(db, "store", expect_loaded=False)
|
||||
|
||||
def _test_metadata_after_failover(self, unique_database, skip_func_test=False):
|
||||
"""Verify that the metadata is correct after failover. Returns the current active
|
||||
catalogd"""
|
||||
(active_catalogd, standby_catalogd) = self.__get_catalogds()
|
||||
catalogd_service_2 = standby_catalogd.service
|
||||
|
||||
@@ -537,6 +561,8 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
self.execute_query_expect_success(
|
||||
self.client, "select %s.identity_tmp(10)" % unique_database)
|
||||
|
||||
self.client.execute("create table %s.tbl(i int)" % unique_database)
|
||||
|
||||
# Kill active catalogd
|
||||
active_catalogd.kill()
|
||||
|
||||
@@ -548,8 +574,15 @@ class TestCatalogdHA(CustomClusterTestSuite):
|
||||
"catalog-server.ha-number-active-status-change") > 0
|
||||
assert catalogd_service_2.get_metric_value("catalog-server.active-status")
|
||||
|
||||
self.execute_query_expect_success(
|
||||
self.client, "select %s.identity_tmp(10)" % unique_database)
|
||||
# TODO: due to IMPALA-14210 the standby catalogd can't update the native function
|
||||
# list by applying the ALTER_DATABASE event. So this will fail as function not found.
|
||||
# Remove this condition after IMPALA-14210 is resolved.
|
||||
if not skip_func_test:
|
||||
self.execute_query_expect_success(
|
||||
self.client, "select %s.identity_tmp(10)" % unique_database)
|
||||
|
||||
self.execute_query_expect_success(self.client, "describe %s.tbl" % unique_database)
|
||||
return catalogd_service_2
|
||||
|
||||
def test_page_with_disable_ha(self):
|
||||
self.__test_catalog_ha_info_page()
|
||||
|
||||
@@ -34,6 +34,7 @@ import pytest
|
||||
from impala_thrift_gen.TCLIService import TCLIService
|
||||
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
|
||||
from tests.common.environ import build_flavor_timeout
|
||||
from tests.common.file_utils import grep_file
|
||||
from tests.common.impala_connection import (
|
||||
ERROR,
|
||||
FINISHED,
|
||||
@@ -42,6 +43,7 @@ from tests.common.impala_connection import (
|
||||
)
|
||||
from tests.common.skip import SkipIfFS, SkipIfNotHdfsMinicluster
|
||||
from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
|
||||
from tests.util.filesystem_utils import FILESYSTEM_PREFIX
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -1115,3 +1117,66 @@ class TestGracefulShutdown(CustomClusterTestSuite, HS2TestSuite):
|
||||
impalad.wait()
|
||||
shutdown_duration = time.time() - start_time
|
||||
assert shutdown_duration <= self.IDLE_SHUTDOWN_GRACE_PERIOD_S + 10
|
||||
|
||||
|
||||
class TestWarmupCatalog(CustomClusterTestSuite):
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
catalogd_args="-logbuflevel=-1 --warmup_tables_config_file=file://%s/testdata/data/"
|
||||
"warmup_table_list.txt --keeps_warmup_tables_loaded=false" %
|
||||
os.environ['IMPALA_HOME'])
|
||||
def test_warmup_tables_local_config_file(self):
|
||||
self._test_warmup_tables(False)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
catalogd_args="-logbuflevel=-1 --warmup_tables_config_file={0}/test-warehouse"
|
||||
"/warmup_table_list.txt --keeps_warmup_tables_loaded=false"
|
||||
.format(FILESYSTEM_PREFIX))
|
||||
def test_warmup_tables_hdfs_config_file(self):
|
||||
self._test_warmup_tables(False)
|
||||
|
||||
@pytest.mark.execute_serially
|
||||
@CustomClusterTestSuite.with_args(
|
||||
catalogd_args="-logbuflevel=-1 --warmup_tables_config_file={0}/test-warehouse"
|
||||
"/warmup_table_list.txt --keeps_warmup_tables_loaded=true"
|
||||
.format(FILESYSTEM_PREFIX))
|
||||
def test_keeps_warmup_tables_loaded(self):
|
||||
self._test_warmup_tables(True)
|
||||
|
||||
def _test_warmup_tables(self, keeps_warmup_tables_loaded):
|
||||
catalogd = self.cluster.catalogd.service
|
||||
self._verify_tables_warmed_up(catalogd)
|
||||
self._verify_warmup_scheduling_order()
|
||||
self.execute_query("invalidate metadata")
|
||||
self._verify_tables_warmed_up(catalogd)
|
||||
self.execute_query("invalidate metadata tpcds.item")
|
||||
catalogd.verify_table_metadata_loaded("tpcds", "item", keeps_warmup_tables_loaded)
|
||||
|
||||
def _verify_tables_warmed_up(self, catalogd):
|
||||
tables = {
|
||||
"tpcds": ["customer", "date_dim", "item", "store_sales"],
|
||||
"tpch": ["customer", "lineitem", "nation", "orders", "part",
|
||||
"partsupp", "region", "supplier"]
|
||||
}
|
||||
for db in tables:
|
||||
for table in tables[db]:
|
||||
catalogd.verify_table_metadata_loaded(db, table)
|
||||
catalogd.verify_table_metadata_loaded("tpcds", "store", expect_loaded=False)
|
||||
|
||||
def _verify_warmup_scheduling_order(self):
|
||||
self.assert_catalogd_log_contains("INFO", "Scheduled 14 tables to be warmed up")
|
||||
with open(self.build_log_path("catalogd", "INFO")) as file:
|
||||
logs = grep_file(file, r"Scheduled warmup on table")
|
||||
assert "tpcds.store_sales" in logs[0]
|
||||
# The order in "tpch" depends on the list tables output and might change in the
|
||||
# future. So just verify the db names.
|
||||
for i in range(1, 9):
|
||||
assert "tpch." in logs[i]
|
||||
assert "tpcds.customer" in logs[9]
|
||||
assert "tpcds.date_dim" in logs[10]
|
||||
assert "tpcds.item" in logs[11]
|
||||
assert "functional.#" in logs[12]
|
||||
assert "functional.alltypes etc #" in logs[13]
|
||||
assert len(logs) == 14
|
||||
|
||||
Reference in New Issue
Block a user