diff --git a/shell/impala_shell.py b/shell/impala_shell.py index cbc28f2a0..e76f09599 100755 --- a/shell/impala_shell.py +++ b/shell/impala_shell.py @@ -1282,58 +1282,61 @@ class ImpalaShell(cmd.Cmd, object): if not summary: return - if summary.is_queued: - queued_msg = "Query queued. Latest queuing reason: %s\n" % summary.queued_reason - self.progress_stream.write(queued_msg) - self.last_summary = time.time() - return - - data = "" - if summary.error_logs: - for error_line in summary.error_logs: - data += error_line + "\n" - if self.webserver_address: - query_id_search = re.search("Retrying query using query id: (.*)", - error_line) - if query_id_search and len(query_id_search.groups()) == 1: - retried_query_id = query_id_search.group(1) - data += "Retried query link: %s\n"\ - % self.imp_client.get_query_link(retried_query_id) - - if summary.progress: - progress = summary.progress - - # If the data is not complete return and wait for a good result. - if not progress.total_scan_ranges and not progress.num_completed_scan_ranges and \ - not progress.total_fragment_instances and \ - not progress.num_completed_fragment_instances: - self.last_summary = time.time() - return - - if self.live_progress and progress.total_scan_ranges > 0: - val = ((summary.progress.num_completed_scan_ranges * 100) - // summary.progress.total_scan_ranges) - scan_progress_text =\ - " Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val) - data += scan_progress_text - - if self.live_progress and progress.total_fragment_instances > 0: - val = ((progress.num_completed_fragment_instances * 100) - // progress.total_fragment_instances) - query_progress_text =\ - "Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val) - data += query_progress_text - - if self.live_summary: - table = self._default_summary_table() - output = [] - self.imp_client.build_summary_table(summary, 0, False, 0, False, output) - formatter = PrettyOutputFormatter(table) - data += formatter.format(output) + "\n" - - self.progress_stream.write(data) + summary_str = self._format_periodic_summary(summary) + if summary_str: + self.progress_stream.write(summary_str) self.last_summary = time.time() + def _format_periodic_summary(self, summary): + if summary.is_queued: + return "Query queued. Latest queuing reason: %s\n" % summary.queued_reason + + data = "" + if summary.error_logs: + for error_line in summary.error_logs: + data += error_line + "\n" + if self.webserver_address: + query_id_search = re.search("Retrying query using query id: (.*)", + error_line) + if query_id_search and len(query_id_search.groups()) == 1: + retried_query_id = query_id_search.group(1) + data += "Retried query link: %s\n"\ + % self.imp_client.get_query_link(retried_query_id) + + if summary.progress: + progress = summary.progress + + has_scan_progress = progress.num_completed_scan_ranges is not None \ + and progress.total_scan_ranges is not None \ + and progress.total_scan_ranges > 0 + + if self.live_progress and has_scan_progress: + val = ((summary.progress.num_completed_scan_ranges * 100) + // summary.progress.total_scan_ranges) + scan_progress_text =\ + " Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val) + data += scan_progress_text + + has_query_progress = progress.num_completed_fragment_instances is not None \ + and progress.total_fragment_instances is not None \ + and progress.total_fragment_instances > 0 + + if self.live_progress and has_query_progress: + val = ((progress.num_completed_fragment_instances * 100) + // progress.total_fragment_instances) + query_progress_text =\ + "Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val) + data += query_progress_text + + if self.live_summary: + table = self._default_summary_table() + output = [] + self.imp_client.build_summary_table(summary, 0, False, 0, False, output) + formatter = PrettyOutputFormatter(table) + data += formatter.format(output) + "\n" + + return data + def _default_summary_table(self): return self.construct_table_with_header(["Operator", "#Hosts", "#Inst", "Avg Time", "Max Time", "#Rows",