Stress test: Fix stack trace collection

A lot of stuff got messed up during the switch to the cluster model...

Changes:
1) find_crashed_impalads() returned a list but the caller expected a
   dict.
2) for_each_impalad() ignored the parameter 'impalads' and instead used
   all impalads in the cluster.
3) find_last_backtrace() returned the oldest core dump instead of the
   newest.
4) num_successive_errors_needed_to_abort was effectively hard-coded to
   2. I'm not sure how that happened.
5) Catch EOFError when getting a query from the work queue. This happens
   when the work queue is shutdown but there are workers waiting for an
   item.
6) Ignore connection errors due to an unresponsive impalad. When the
   load on an impalad get very high it randomly stops responding to
   client requests. Reducing the load seems to help.
7) Added various log messages.

Change-Id: Icb823dc47a51874b0f8a0b20f966a556752f7796
Reviewed-on: http://gerrit.cloudera.org:8080/2176
Reviewed-by: Casey Ching <casey@cloudera.com>
Tested-by: Casey Ching <casey@cloudera.com>
This commit is contained in:
Casey Ching
2016-02-03 14:57:25 -08:00
parent 583d31a906
commit 31ccd85605
2 changed files with 40 additions and 14 deletions

View File

@@ -495,7 +495,7 @@ class Impala(Service):
"""
stopped_impalads = self.find_stopped_impalads()
if not stopped_impalads:
return stopped_impalads
return dict.fromkeys(stopped_impalads)
messages = OrderedDict()
impalads_with_message = dict()
for i, message in izip(stopped_impalads, self.for_each_impalad(
@@ -505,16 +505,16 @@ class Impala(Service):
else:
messages[i] = "%s crashed but no info could be found" % i.host_name
messages.update(impalads_with_message)
return stopped_impalads
return messages
def for_each_impalad(self, func, impalads=None, as_dict=False):
if impalads is None:
impalads = self.impalads
promise = self._thread_pool.map_async(func, self.impalads)
promise = self._thread_pool.map_async(func, impalads)
# Python doesn't handle ctrl-c well unless a timeout is provided.
results = promise.get(maxint)
if as_dict:
results = dict(izip(self.impalads, results))
results = dict(izip(impalads, results))
return results
def restart(self):
@@ -623,7 +623,7 @@ class Impalad(object):
bt = self.shell("""
LAST_CORE_FILE=$(
find "{core_dump_dir}" -maxdepth 1 -name "*core*" -printf "%T@ %p\\n" \\
| sort -n | head -1 | cut -f 1 -d ' ' --complement)
| sort -n | tail -1 | cut -f 1 -d ' ' --complement)
if [[ -n "$LAST_CORE_FILE" ]]; then
MTIME=$(stat -c %Y "$LAST_CORE_FILE")
if [[ "$MTIME" -ge {start_time_unix} ]]; then