IMPALA-12801: Increase query_log_ default size and bound its memory.

Coordinator's /queries page is useful to show information about recently
run and completed queries. Having more entries will be helpful to
inspect queries that completed further back. The maximum entry of this
table is controlled by 'query_log_size' flag. Higher value means more
queries to keep, but it also cost more memory overhead in coordinator.

This patch increase 'query_log_size' default value from 100 to 200. This
patch also add flag 'query_log_size_in_bytes' (default to 2GB) as an
additional safeguard to evict entry from query_log_ when this limit
exceeded, preventing query_log_ total memory to grow prohibitively
large. 'query_log_size_in_bytes' is used in combination with
'query_log_size' to limit the number of QueryStateRecord to retain in
query_log_, whichever is less.

Testing:
- Pass exhaustive tests.

Change-Id: I107e2c2c7f2b239557be37360e8eecf5479e8602
Reviewed-on: http://gerrit.cloudera.org:8080/21020
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Riza Suminto
2024-02-07 08:47:38 -08:00
committed by Impala Public Jenkins
parent 15e471563d
commit f5c12c65db
5 changed files with 110 additions and 14 deletions

View File

@@ -173,8 +173,14 @@ DEFINE_int32(fe_service_threads, 64,
"number of threads available to serve client requests");
DEFINE_string(default_query_options, "", "key=value pair of default query options for"
" impalad, separated by ','");
DEFINE_int32(query_log_size, 100, "Number of queries to retain in the query log. If -1, "
"the query log has unbounded size.");
DEFINE_int32(query_log_size, 200,
"Number of queries to retain in the query log. If -1, "
"the query log has unbounded size. Used in combination with query_log_size_in_bytes, "
"whichever is less.");
DEFINE_int64(query_log_size_in_bytes, 2L * 1024 * 1024 * 1024,
"Total maximum bytes of queries to retain in the query log. If -1, "
"the query log has unbounded size. Used in combination with query_log_size, "
"whichever is less");
DEFINE_int32(query_stmt_size, 250, "length of the statements in the query log. If <=0, "
"the full statement is displayed in the query log without trimming.");
DEFINE_bool(log_query_to_file, true, "if true, logs completed query profiles to file.");
@@ -1139,7 +1145,7 @@ void ImpalaServer::ArchiveQuery(const QueryHandle& query_handle) {
}
}
if (FLAGS_query_log_size == 0) return;
if (FLAGS_query_log_size == 0 || FLAGS_query_log_size_in_bytes == 0) return;
// 'fetch_rows_lock()' protects several fields in ClientReqestState that are read
// during QueryStateRecord creation. There should be no contention on this lock because
// the query has already been closed (e.g. no more results can be fetched).
@@ -1151,20 +1157,79 @@ void ImpalaServer::ArchiveQuery(const QueryHandle& query_handle) {
if (query_handle->GetCoordinator() != nullptr) {
query_handle->GetCoordinator()->GetTExecSummary(&record->exec_summary);
}
int64_t record_size = EstimateSize(record.get());
VLOG(3) << "QueryStateRecord of " << PrintId(query_handle->query_id()) << " is "
<< record_size << " bytes";
{
lock_guard<mutex> l(query_log_lock_);
// Add record to the beginning of the log, and to the lookup index.
query_log_est_total_size_ += record_size;
query_log_est_sizes_.push_front(record_size);
query_log_.push_front(move(record));
query_log_index_[query_handle->query_id()] = &query_log_.front();
if (FLAGS_query_log_size > -1 && FLAGS_query_log_size < query_log_.size()) {
DCHECK_EQ(query_log_.size() - FLAGS_query_log_size, 1);
while (!query_log_.empty()
&& ((FLAGS_query_log_size > -1 && FLAGS_query_log_size < query_log_.size())
|| (FLAGS_query_log_size_in_bytes > -1
&& FLAGS_query_log_size_in_bytes < query_log_est_total_size_))) {
query_log_index_.erase(query_log_.back()->id);
query_log_est_total_size_ -= query_log_est_sizes_.back();
query_log_est_sizes_.pop_back();
query_log_.pop_back();
DCHECK_GE(query_log_est_total_size_, 0);
DCHECK_EQ(query_log_.size(), query_log_est_sizes_.size());
}
}
}
int64_t ImpalaServer::EstimateSize(const QueryStateRecord* record) {
int64_t size = sizeof(QueryStateRecord); // 800
size += sizeof(uint8_t) * record->compressed_profile.capacity();
size += record->effective_user.capacity();
size += record->default_db.capacity();
size += record->stmt.capacity();
size += record->plan.capacity();
size += record->query_state.capacity();
size += record->timeline.capacity();
size += record->resource_pool.capacity();
// The following dynamic memory of field members are estimated rather than
// exactly sized. Some of thrift members might be nested, but the estimation
// does not traverse deeper than the first level.
// TExecSummary exec_summary
if (record->exec_summary.__isset.nodes) {
size += sizeof(TPlanNodeExecSummary) * record->exec_summary.nodes.capacity();
}
if (record->exec_summary.__isset.exch_to_sender_map) {
size += sizeof(int32_t) * 2 * record->exec_summary.exch_to_sender_map.size();
}
if (record->exec_summary.__isset.error_logs) {
for (const auto& log : record->exec_summary.error_logs) size += log.capacity();
}
if (record->exec_summary.__isset.queued_reason) {
size += record->exec_summary.queued_reason.capacity();
}
// Status query_status
if (!record->query_status.ok()) {
size += record->query_status.msg().msg().capacity();
for (const auto& detail : record->query_status.msg().details()) {
size += detail.capacity();
}
}
// TEventSequence event_sequence
size += record->event_sequence.name.capacity();
size += sizeof(int64_t) * record->event_sequence.timestamps.capacity();
for (const auto& label : record->event_sequence.labels) size += label.capacity();
// vector<TPlanFragment> fragments
size += sizeof(TPlanFragment) * record->fragments.capacity();
return size;
}
ImpalaServer::~ImpalaServer() {}
void ImpalaServer::AddPoolConfiguration(TQueryCtx* ctx,

View File

@@ -942,7 +942,7 @@ class ImpalaServer : public ImpalaServiceIf,
std::string ColumnTypeToBeeswaxTypeString(const TColumnType& type);
/// Snapshot of a query's state, archived in the query log. Not mutated after
/// construction.
/// construction. Please update EstimateSize() if field member changed.
struct QueryStateRecord {
/// Compressed representation of profile returned by RuntimeProfile::Compress().
/// Must be initialised to a valid value if this is a completed query.
@@ -1078,6 +1078,11 @@ class ImpalaServer : public ImpalaServiceIf,
bool operator() (const QueryStateRecord& lhs, const QueryStateRecord& rhs) const;
};
/// Return the estimated size of given record in bytes.
/// It does not meant to return exact byte size of given QueryStateRecord in memory,
/// but should account for compressed_profile vector of record.
static int64_t EstimateSize(const QueryStateRecord* record);
/// Returns the active QueryHandle for this query id. The QueryHandle contains the
/// active ClientRequestState. Returns an error Status if the query id cannot be found.
/// If caller is an RPC thread, RPC context will be registered for time tracking
@@ -1269,7 +1274,8 @@ class ImpalaServer : public ImpalaServiceIf,
/// Random number generator for use in this class, thread safe.
static ThreadSafeRandom rng_;
/// Guards query_log_ and query_log_index_
/// Guards query_log_, query_log_index_, query_log_est_sizes_,
/// and query_log_est_total_size_.
std::mutex query_log_lock_;
/// FIFO list of query records, which are written after the query finishes executing.
@@ -1288,6 +1294,10 @@ class ImpalaServer : public ImpalaServiceIf,
QueryLogIndex;
QueryLogIndex query_log_index_;
/// Estimated individual and total size of records in query_log_ in bytes.
std::list<int64_t> query_log_est_sizes_;
int64_t query_log_est_total_size_;
/// Sets the given query_record (and retried_query_record too if given) for the given
/// query_id. Returns an error Status if the given query_id cannot be found in the
/// QueryLogIndex.

View File

@@ -578,7 +578,8 @@ under the License.
</ul>
The queries are listed in reverse chronological order, with the most recent at the
top. You can control the amount of memory devoted to completed queries by specifying
the <codeph>-&#8209;-&#8209;query_log_size</codeph> startup option for
the <codeph>-&#8209;-&#8209;query_log_size</codeph>
and <codeph>-&#8209;-&#8209;query_log_size_in_bytes</codeph> startup options for
<codeph>impalad</codeph>.
</p>

View File

@@ -80,7 +80,7 @@ class TestWebPage(CustomClusterTestSuite):
def test_webserver_interface(self):
addrs = psutil.net_if_addrs()
print("net_if_addrs returned: %s" % addrs)
ip_matcher = re.compile("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
ip_matcher = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
ip_addrs = []
for addr in addrs:
for snic in addrs[addr]:
@@ -147,6 +147,26 @@ class TestWebPage(CustomClusterTestSuite):
assert expected in response_json, "No matching statement found in the queries site."
assert '"resource_pool": "default-pool"' in response_json
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--query_log_size_in_bytes=" + str(2 * 1024 * 1024)
)
def test_query_log_size_in_bytes(self):
"""Check if query list is limited by query_log_size_in_bytes flag."""
# The input query will produce ~627228 bytes of QueryStateRecord in MT_DOP=8.
query = ("with l1 as (select id id1 from functional.alltypes), "
"l2 as (select a1.id1 id2 from l1 as a1 join l1 as b1 on a1.id1=-b1.id1), "
"l3 as (select a2.id2 id3 from l2 as a2 join l2 as b2 on a2.id2=-b2.id2), "
"l4 as (select a3.id3 id4 from l3 as a3 join l3 as b3 on a3.id3=-b3.id3), "
"l5 as (select a4.id4 id5 from l4 as a4 join l4 as b4 on a4.id4=-b4.id4) "
"select * from l5 limit 100;")
self.execute_query("SET MT_DOP=8;")
for i in range(0, 5):
self.execute_query(query)
response = requests.get("http://localhost:25000/queries?json")
queries_json = json.loads(response.text)
assert len(queries_json["completed_queries"]) == 3
# Checks if 'messages' exists/does not exist in 'result_stderr' based on the value of
# 'should_exist'
def _validate_shell_messages(self, result_stderr, messages, should_exist=True):

View File

@@ -25,11 +25,11 @@ under the License.
</style>
<h2>Queries</h2>
<p class="lead">This page lists all running queries plus completed queries
that are archived in memory, and imported query profiles.
The size of that archive is controlled with the
<samp>--query_log_size</samp> command line parameter.</p>
<p>The length of the statements are controlled with the <samp>--query_stmt_size</samp>
<p class="lead">This page lists all running queries, completed queries that are archived
in memory, and imported query profiles.
<p>The in-memory archive size is controlled by <samp>--query_log_size</samp>
and <samp>--query_log_size_in_bytes</samp> command line parameters.
The length of the statements are controlled by <samp>--query_stmt_size</samp>
command line parameter.</p>
<h3>{{num_executing_queries}} queries in flight</h3>