mirror of
https://github.com/apache/impala.git
synced 2025-12-26 14:02:53 -05:00
IMPALA-9182: Print the socket address of the client closing a session or cancelling a query from the WebUI
This change appends the socket address (HOST:PORT) of the client who made the request to close a session or cancel a query from the coordinator's debug WebUI. Existing statuses: "Cancelled from Impala's debug web interface" "Session closed from Impala's debug web interface" New statuses: "Cancelled from Impala's debug web interface by client at <host>:<port>" "Session closed from Impala's debug web interface by client at <host>:<port>" Testing: -Verified visually that the status message is printed in the impalad log with the socket address when one cancels a query or closes a session. -Added a new e2e test to verify that the new status gets printed in runtime profile and coordinator log when a query is cancelled in this way. -Made log asserts more robust by adding a timeout/wait value. Change-Id: Icf74ad06ce1c40fab4ce37de6b7ca78e3e520b43 Reviewed-on: http://gerrit.cloudera.org:8080/14782 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
8ddbc1807a
commit
d96dab20dd
@@ -65,6 +65,9 @@ class WebCallbackRegistry {
|
||||
|
||||
// In the case of a POST, the posted data.
|
||||
std::string post_data;
|
||||
|
||||
// The socket address of the requester, <host>:<port>.
|
||||
std::string source_socket;
|
||||
};
|
||||
|
||||
// A response to an HTTP request whose body is rendered by template.
|
||||
|
||||
@@ -212,7 +212,8 @@ void ImpalaHttpHandler::CancelQueryHandler(const Webserver::WebRequest& req,
|
||||
document->AddMember("error", error, document->GetAllocator());
|
||||
return;
|
||||
}
|
||||
Status cause("Cancelled from Impala's debug web interface");
|
||||
Status cause(Substitute("Cancelled from Impala's debug web interface by client at $0"
|
||||
, req.source_socket));
|
||||
// Web UI doesn't have access to secret so we can't validate it. We assume that
|
||||
// web UI is allowed to close queries.
|
||||
status = server_->UnregisterQuery(unique_id, true, &cause);
|
||||
@@ -234,7 +235,8 @@ void ImpalaHttpHandler::CloseSessionHandler(const Webserver::WebRequest& req,
|
||||
document->AddMember("error", error, document->GetAllocator());
|
||||
return;
|
||||
}
|
||||
Status cause("Session closed from Impala's debug web interface");
|
||||
Status cause(Substitute("Session closed from Impala's debug web interface by client at"
|
||||
" $0", req.source_socket));
|
||||
// Web UI doesn't have access to secret so we can't validate it. We assume that
|
||||
// web UI is allowed to close sessions.
|
||||
status = server_->CloseSessionInternal(unique_id,
|
||||
|
||||
@@ -580,6 +580,7 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
|
||||
}
|
||||
|
||||
WebRequest req;
|
||||
req.source_socket = GetRemoteAddress(request_info).ToString();
|
||||
if (request_info->query_string != nullptr) {
|
||||
req.query_string = request_info->query_string;
|
||||
BuildArgumentMap(request_info->query_string, &req.parsed_args);
|
||||
|
||||
@@ -1147,44 +1147,61 @@ class ImpalaTestSuite(BaseTestSuite):
|
||||
"Check failed to return True after {0} tries and {1} seconds{2}".format(
|
||||
count, timeout_s, error_msg_str))
|
||||
|
||||
def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
|
||||
def assert_impalad_log_contains(self, level, line_regex, expected_count=1, timeout_s=6):
|
||||
"""
|
||||
Convenience wrapper around assert_log_contains for impalad logs.
|
||||
"""
|
||||
self.assert_log_contains("impalad", level, line_regex, expected_count)
|
||||
self.assert_log_contains("impalad", level, line_regex, expected_count, timeout_s)
|
||||
|
||||
def assert_catalogd_log_contains(self, level, line_regex, expected_count=1):
|
||||
def assert_catalogd_log_contains(self, level, line_regex, expected_count=1,
|
||||
timeout_s=6):
|
||||
"""
|
||||
Convenience wrapper around assert_log_contains for catalogd logs.
|
||||
"""
|
||||
self.assert_log_contains("catalogd", level, line_regex, expected_count)
|
||||
self.assert_log_contains("catalogd", level, line_regex, expected_count, timeout_s)
|
||||
|
||||
def assert_log_contains(self, daemon, level, line_regex, expected_count=1):
|
||||
def assert_log_contains(self, daemon, level, line_regex, expected_count=1, timeout_s=6):
|
||||
"""
|
||||
Assert that the daemon log with specified level (e.g. ERROR, WARNING, INFO) contains
|
||||
expected_count lines with a substring matching the regex. When expected_count is -1,
|
||||
at least one match is expected.
|
||||
Retries until 'timeout_s' has expired. The default timeout is the default minicluster
|
||||
log buffering time (5 seconds) with a one second buffer.
|
||||
When using this method to check log files of running processes, the caller should
|
||||
make sure that log buffering has been disabled, for example by adding
|
||||
'-logbuflevel=-1' to the daemon startup options.
|
||||
'-logbuflevel=-1' to the daemon startup options or set timeout_s to a value higher
|
||||
than the log flush interval.
|
||||
"""
|
||||
pattern = re.compile(line_regex)
|
||||
found = 0
|
||||
if hasattr(self, "impala_log_dir"):
|
||||
log_dir = self.impala_log_dir
|
||||
else:
|
||||
log_dir = EE_TEST_LOGS_DIR
|
||||
log_file_path = os.path.join(log_dir, daemon + "." + level)
|
||||
# Resolve symlinks to make finding the file easier.
|
||||
log_file_path = os.path.realpath(log_file_path)
|
||||
with open(log_file_path) as log_file:
|
||||
for line in log_file:
|
||||
if pattern.search(line):
|
||||
found += 1
|
||||
if expected_count == -1:
|
||||
assert found > 0, "Expected at least one line in file %s matching regex '%s'"\
|
||||
", but found none." % (log_file_path, line_regex)
|
||||
else:
|
||||
assert found == expected_count, "Expected %d lines in file %s matching regex '%s'"\
|
||||
", but found %d lines. Last line was: \n%s" %\
|
||||
(expected_count, log_file_path, line_regex, found, line)
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
found = 0
|
||||
if hasattr(self, "impala_log_dir"):
|
||||
log_dir = self.impala_log_dir
|
||||
else:
|
||||
log_dir = EE_TEST_LOGS_DIR
|
||||
log_file_path = os.path.join(log_dir, daemon + "." + level)
|
||||
# Resolve symlinks to make finding the file easier.
|
||||
log_file_path = os.path.realpath(log_file_path)
|
||||
with open(log_file_path) as log_file:
|
||||
for line in log_file:
|
||||
if pattern.search(line):
|
||||
found += 1
|
||||
if expected_count == -1:
|
||||
assert found > 0, "Expected at least one line in file %s matching regex '%s'"\
|
||||
", but found none." % (log_file_path, line_regex)
|
||||
else:
|
||||
assert found == expected_count, \
|
||||
"Expected %d lines in file %s matching regex '%s', but found %d lines. "\
|
||||
"Last line was: \n%s" %\
|
||||
(expected_count, log_file_path, line_regex, found, line)
|
||||
return
|
||||
except AssertionError as e:
|
||||
# Re-throw the exception to the caller only when the timeout is expired. Otherwise
|
||||
# sleep before retrying.
|
||||
if time.time() - start_time > timeout_s:
|
||||
raise
|
||||
LOG.info("Expected log lines could not be found, sleeping before retrying: %s",
|
||||
str(e))
|
||||
time.sleep(1)
|
||||
|
||||
@@ -40,8 +40,6 @@ class TestLogFragments(ImpalaTestSuite):
|
||||
query_id = re.search("id=([0-9a-f]+:[0-9a-f]+)",
|
||||
result.runtime_profile).groups()[0]
|
||||
self.execute_query("select 1")
|
||||
# Logging may be buffered, so sleep to wait out the buffering.
|
||||
time.sleep(6)
|
||||
self.assert_impalad_log_contains('INFO', query_id +
|
||||
"] Analysis and authorization finished.")
|
||||
assert query_id.endswith("000")
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
|
||||
from tests.common.environ import ImpalaTestClusterFlagsDetector
|
||||
from tests.common.file_utils import grep_dir
|
||||
from tests.common.skip import SkipIfBuildType
|
||||
from tests.common.skip import SkipIfBuildType, SkipIfDockerizedCluster
|
||||
from tests.common.impala_cluster import ImpalaCluster
|
||||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||||
import itertools
|
||||
@@ -762,3 +762,29 @@ class TestWebPage(ImpalaTestSuite):
|
||||
self.get_and_check_status(self.ROOT_URL,
|
||||
"href='http://.*:%s/'" % self.IMPALAD_TEST_PORT[0], self.IMPALAD_TEST_PORT,
|
||||
regex=True, headers={'X-Forwarded-Context': '/gateway'})
|
||||
|
||||
@SkipIfDockerizedCluster.daemon_logs_not_exposed
|
||||
def test_display_src_socket_in_query_cause(self):
|
||||
# Execute a long running query then cancel it from the WebUI.
|
||||
# Check the runtime profile and the INFO logs for the cause message.
|
||||
query = "select sleep(10000)"
|
||||
query_id = self.execute_query_async(query).get_handle().id
|
||||
cancel_query_url = "{0}cancel_query?query_id={1}".format(self.ROOT_URL.format
|
||||
("25000"), query_id)
|
||||
text_profile_url = "{0}query_profile_plain_text?query_id={1}".format(self.ROOT_URL
|
||||
.format("25000"), query_id)
|
||||
requests.get(cancel_query_url)
|
||||
response = requests.get(text_profile_url)
|
||||
cancel_status = "Cancelled from Impala's debug web interface by client at"
|
||||
assert cancel_status in response.text
|
||||
self.assert_impalad_log_contains("INFO", "Cancelled from Impala\'s debug web "
|
||||
"interface by client at", expected_count=-1)
|
||||
# Session closing from the WebUI does not produce the cause message in the profile,
|
||||
# so we will skip checking the runtime profile.
|
||||
results = self.execute_query("select current_session()")
|
||||
session_id = results.data[0]
|
||||
close_session_url = "{0}close_session?session_id={1}".format(self.ROOT_URL.format
|
||||
("25000"), session_id)
|
||||
requests.get(close_session_url)
|
||||
self.assert_impalad_log_contains("INFO", "Session closed from Impala\'s debug "
|
||||
"web interface by client at", expected_count=-1)
|
||||
|
||||
Reference in New Issue
Block a user