From 68a9630adcf9bf064ba64304efab788f6e2a165d Mon Sep 17 00:00:00 2001 From: stiga-huang Date: Fri, 12 Dec 2025 17:27:30 +0800 Subject: [PATCH] IMPALA-14284: Log the actual log files instead of symlinks in start-impala-cluster.py It's not that easy to find log files of a custom-cluster test. All custom-cluster tests use the same log dir and the test output just shows the symlink of the log files, e.g. "Starting State Store logging to .../logs/custom_cluster_tests/statestored.INFO". This patch prints the actual log file names after the cluster launchs. An example output: 15:17:19 MainThread: Starting State Store logging to /tmp/statestored.INFO 15:17:19 MainThread: Starting Catalog Service logging to /tmp/catalogd.INFO 15:17:19 MainThread: Starting Impala Daemon logging to /tmp/impalad.INFO 15:17:19 MainThread: Starting Impala Daemon logging to /tmp/impalad_node1.INFO 15:17:19 MainThread: Starting Impala Daemon logging to /tmp/impalad_node2.INFO ... 15:17:24 MainThread: Total wait: 2.54s 15:17:24 MainThread: Actual log file names: 15:17:24 MainThread: statestored.INFO -> statestored.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094348 15:17:24 MainThread: catalogd.INFO -> catalogd.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094368 15:17:24 MainThread: impalad.INFO -> impalad.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094466 15:17:24 MainThread: impalad_node1.INFO -> impalad.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094468 15:17:24 MainThread: impalad_node2.INFO -> impalad.quanlong-Precision-3680.quanlong.log.INFO.20251216-151719.1094470 15:17:24 MainThread: Impala Cluster Running with 3 nodes (3 coordinators, 3 executors). Tests - Ran the script locally. - Ran a failed custom-cluster test and verified the actual file names are printed in the output. Change-Id: Id76c0a8bdfb221ab24ee315e2e273abca4257398 Reviewed-on: http://gerrit.cloudera.org:8080/23781 Reviewed-by: Impala Public Jenkins Tested-by: Quanlong Huang --- bin/start-impala-cluster.py | 50 +++++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py index e6042417f..e7d2be2b9 100755 --- a/bin/start-impala-cluster.py +++ b/bin/start-impala-cluster.py @@ -249,6 +249,21 @@ def check_process_exists(binary, attempts=1): return False +def print_actual_log_file(log_symlink, timeout=5): + """Resolve the log symlink to get the actual log file name.""" + symlink_name = os.path.basename(log_symlink) + for _ in range(timeout): + if os.path.exists(log_symlink): + try: + actual_path = os.path.realpath(log_symlink) + actual_file = os.path.basename(actual_path) + LOG.info("{0} -> {1}".format(symlink_name, actual_file)) + return + except OSError as e: + LOG.error("Error resolving log symlink {0}: {1}".format(log_symlink, e)) + return + sleep(1) + def run_daemon_with_options(daemon_binary, args, output_file, jvm_debug_port=None): """Wrapper around run_daemon() with options determined from command-line options.""" env_vars = {"JAVA_TOOL_OPTIONS": build_java_tool_options(jvm_debug_port)} @@ -762,6 +777,10 @@ class MiniClusterOperations(object): implementation. TODO: much of this logic could be moved into ImpalaCluster. """ + def __init__(self): + # Track log symlinks to print after cluster is ready + self.log_symlinks = [] + def get_cluster(self): """Return an ImpalaCluster instance.""" return ImpalaCluster(use_admission_service=options.enable_admission_service, @@ -790,15 +809,15 @@ class MiniClusterOperations(object): statestored_arg_lists = build_statestored_arg_list(num_statestored, remap_ports=True) for i in range(num_statestored): service_name = statestored_service_name(i) - LOG.info( - "Starting State Store logging to {log_dir}/{service_name}.INFO".format( - log_dir=options.log_dir, service_name=service_name)) + log_symlink = os.path.join(options.log_dir, "{0}.INFO".format(service_name)) + LOG.info("Starting State Store logging to {0}".format(log_symlink)) output_file = os.path.join( options.log_dir, "{service_name}-out.log".format(service_name=service_name)) run_daemon_with_options("statestored", statestored_arg_lists[i], output_file) if not check_process_exists("statestored", 10): raise RuntimeError("Unable to start statestored. Check log or file permissions" " for more details.") + self.log_symlinks.append(log_symlink) def start_catalogd(self): if options.no_catalogd: @@ -810,9 +829,8 @@ class MiniClusterOperations(object): catalogd_arg_lists = build_catalogd_arg_list(num_catalogd, remap_ports=True) for i in range(num_catalogd): service_name = catalogd_service_name(i) - LOG.info( - "Starting Catalog Service logging to {log_dir}/{service_name}.INFO".format( - log_dir=options.log_dir, service_name=service_name)) + log_symlink = os.path.join(options.log_dir, "{0}.INFO".format(service_name)) + LOG.info("Starting Catalog Service logging to {0}".format(log_symlink)) output_file = os.path.join( options.log_dir, "{service_name}-out.log".format(service_name=service_name)) run_daemon_with_options("catalogd", catalogd_arg_lists[i], output_file, @@ -820,15 +838,17 @@ class MiniClusterOperations(object): if not check_process_exists("catalogd", 10): raise RuntimeError("Unable to start catalogd. Check log or file permissions" " for more details.") + self.log_symlinks.append(log_symlink) def start_admissiond(self): - LOG.info("Starting Admission Control Service logging to {log_dir}/admissiond.INFO" - .format(log_dir=options.log_dir)) + log_symlink = os.path.join(options.log_dir, "admissiond.INFO") + LOG.info("Starting Admission Control Service logging to {0}".format(log_symlink)) output_file = os.path.join(options.log_dir, "admissiond-out.log") run_daemon_with_options("admissiond", build_admissiond_arg_list(), output_file) if not check_process_exists("admissiond", 10): raise RuntimeError("Unable to start admissiond. Check log or file permissions" " for more details.") + self.log_symlinks.append(log_symlink) def start_impalads(self, cluster_size, num_coordinators, use_exclusive_coordinators, start_idx=0): @@ -849,12 +869,13 @@ class MiniClusterOperations(object): assert cluster_size == len(impalad_arg_lists) for i in range(start_idx, start_idx + cluster_size): service_name = impalad_service_name(i) - LOG.info("Starting Impala Daemon logging to {log_dir}/{service_name}.INFO".format( - log_dir=options.log_dir, service_name=service_name)) + log_symlink = os.path.join(options.log_dir, "{0}.INFO".format(service_name)) + LOG.info("Starting Impala Daemon logging to {0}".format(log_symlink)) output_file = os.path.join( options.log_dir, "{service_name}-out.log".format(service_name=service_name)) run_daemon_with_options("impalad", impalad_arg_lists[i - start_idx], jvm_debug_port=DEFAULT_IMPALAD_JVM_DEBUG_PORT + i, output_file=output_file) + self.log_symlinks.append(log_symlink) class DockerMiniClusterOperations(object): @@ -873,6 +894,8 @@ class DockerMiniClusterOperations(object): """ def __init__(self, network_name): self.network_name = network_name + # Track log symlinks to print after cluster is ready (not used in docker mode) + self.log_symlinks = [] # Make sure that the network actually exists. check_call(["docker", "network", "inspect", network_name]) @@ -1262,6 +1285,13 @@ if __name__ == "__main__": expected_num_ready_impalads = options.cluster_size expected_cluster_size = options.cluster_size impala_cluster.wait_until_ready(expected_cluster_size, expected_num_ready_impalads) + + # Print actual log files after cluster is ready + if cluster_ops.log_symlinks: + LOG.info("Actual log file names:") + for log_symlink in cluster_ops.log_symlinks: + print_actual_log_file(log_symlink) + except Exception as e: LOG.exception("Error starting cluster") sys.exit(1)