mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12978: Fix impala-shell`s live progress with older Impalas
If the Impala server has an older version that does not contain IMPALA-12048 then TExecProgress.total_fragment_instances will be None, leading to error when checking total_fragment_instances > 0. Note that this issue only comes with Python 3, in Python 2 None > 0 returns False. Testing: - Manually checked with a modified Impala that doesn't set total_fragment_instances. Only the scanner progress bar is shown in this case. Change-Id: Ic6562ff6c908bfebd09b7612bc5bcbd92623a8e6 Reviewed-on: http://gerrit.cloudera.org:8080/21256 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Reviewed-by: Zihao Ye <eyizoha@163.com>
This commit is contained in:
committed by
Zihao Ye
parent
07218588a6
commit
5c003cdcda
@@ -1282,58 +1282,61 @@ class ImpalaShell(cmd.Cmd, object):
|
|||||||
if not summary:
|
if not summary:
|
||||||
return
|
return
|
||||||
|
|
||||||
if summary.is_queued:
|
summary_str = self._format_periodic_summary(summary)
|
||||||
queued_msg = "Query queued. Latest queuing reason: %s\n" % summary.queued_reason
|
if summary_str:
|
||||||
self.progress_stream.write(queued_msg)
|
self.progress_stream.write(summary_str)
|
||||||
self.last_summary = time.time()
|
|
||||||
return
|
|
||||||
|
|
||||||
data = ""
|
|
||||||
if summary.error_logs:
|
|
||||||
for error_line in summary.error_logs:
|
|
||||||
data += error_line + "\n"
|
|
||||||
if self.webserver_address:
|
|
||||||
query_id_search = re.search("Retrying query using query id: (.*)",
|
|
||||||
error_line)
|
|
||||||
if query_id_search and len(query_id_search.groups()) == 1:
|
|
||||||
retried_query_id = query_id_search.group(1)
|
|
||||||
data += "Retried query link: %s\n"\
|
|
||||||
% self.imp_client.get_query_link(retried_query_id)
|
|
||||||
|
|
||||||
if summary.progress:
|
|
||||||
progress = summary.progress
|
|
||||||
|
|
||||||
# If the data is not complete return and wait for a good result.
|
|
||||||
if not progress.total_scan_ranges and not progress.num_completed_scan_ranges and \
|
|
||||||
not progress.total_fragment_instances and \
|
|
||||||
not progress.num_completed_fragment_instances:
|
|
||||||
self.last_summary = time.time()
|
|
||||||
return
|
|
||||||
|
|
||||||
if self.live_progress and progress.total_scan_ranges > 0:
|
|
||||||
val = ((summary.progress.num_completed_scan_ranges * 100)
|
|
||||||
// summary.progress.total_scan_ranges)
|
|
||||||
scan_progress_text =\
|
|
||||||
" Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val)
|
|
||||||
data += scan_progress_text
|
|
||||||
|
|
||||||
if self.live_progress and progress.total_fragment_instances > 0:
|
|
||||||
val = ((progress.num_completed_fragment_instances * 100)
|
|
||||||
// progress.total_fragment_instances)
|
|
||||||
query_progress_text =\
|
|
||||||
"Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val)
|
|
||||||
data += query_progress_text
|
|
||||||
|
|
||||||
if self.live_summary:
|
|
||||||
table = self._default_summary_table()
|
|
||||||
output = []
|
|
||||||
self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
|
|
||||||
formatter = PrettyOutputFormatter(table)
|
|
||||||
data += formatter.format(output) + "\n"
|
|
||||||
|
|
||||||
self.progress_stream.write(data)
|
|
||||||
self.last_summary = time.time()
|
self.last_summary = time.time()
|
||||||
|
|
||||||
|
def _format_periodic_summary(self, summary):
|
||||||
|
if summary.is_queued:
|
||||||
|
return "Query queued. Latest queuing reason: %s\n" % summary.queued_reason
|
||||||
|
|
||||||
|
data = ""
|
||||||
|
if summary.error_logs:
|
||||||
|
for error_line in summary.error_logs:
|
||||||
|
data += error_line + "\n"
|
||||||
|
if self.webserver_address:
|
||||||
|
query_id_search = re.search("Retrying query using query id: (.*)",
|
||||||
|
error_line)
|
||||||
|
if query_id_search and len(query_id_search.groups()) == 1:
|
||||||
|
retried_query_id = query_id_search.group(1)
|
||||||
|
data += "Retried query link: %s\n"\
|
||||||
|
% self.imp_client.get_query_link(retried_query_id)
|
||||||
|
|
||||||
|
if summary.progress:
|
||||||
|
progress = summary.progress
|
||||||
|
|
||||||
|
has_scan_progress = progress.num_completed_scan_ranges is not None \
|
||||||
|
and progress.total_scan_ranges is not None \
|
||||||
|
and progress.total_scan_ranges > 0
|
||||||
|
|
||||||
|
if self.live_progress and has_scan_progress:
|
||||||
|
val = ((summary.progress.num_completed_scan_ranges * 100)
|
||||||
|
// summary.progress.total_scan_ranges)
|
||||||
|
scan_progress_text =\
|
||||||
|
" Scan Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val)
|
||||||
|
data += scan_progress_text
|
||||||
|
|
||||||
|
has_query_progress = progress.num_completed_fragment_instances is not None \
|
||||||
|
and progress.total_fragment_instances is not None \
|
||||||
|
and progress.total_fragment_instances > 0
|
||||||
|
|
||||||
|
if self.live_progress and has_query_progress:
|
||||||
|
val = ((progress.num_completed_fragment_instances * 100)
|
||||||
|
// progress.total_fragment_instances)
|
||||||
|
query_progress_text =\
|
||||||
|
"Query Progress:[%s%s] %s%%\n" % ("#" * val, " " * (100 - val), val)
|
||||||
|
data += query_progress_text
|
||||||
|
|
||||||
|
if self.live_summary:
|
||||||
|
table = self._default_summary_table()
|
||||||
|
output = []
|
||||||
|
self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
|
||||||
|
formatter = PrettyOutputFormatter(table)
|
||||||
|
data += formatter.format(output) + "\n"
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
def _default_summary_table(self):
|
def _default_summary_table(self):
|
||||||
return self.construct_table_with_header(["Operator", "#Hosts", "#Inst",
|
return self.construct_table_with_header(["Operator", "#Hosts", "#Inst",
|
||||||
"Avg Time", "Max Time", "#Rows",
|
"Avg Time", "Max Time", "#Rows",
|
||||||
|
|||||||
Reference in New Issue
Block a user