mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-7433: reduce logging on executors
Moved logs to -v=2 for reasons described in the JIRA. Added more details to some existing log messages or new less-frequent log messages so that useful information is not removed. Sample logging for an executor after the change: I0813 12:10:50.249850 31250 impala-internal-service.cc:49] ExecQueryFInstances(): query_id=fd4ae28bc993236e:27343be100000000 coord=tarmstrong-box:22000 #instances=2 I0813 12:10:50.250722 31256 query-state.cc:477] Executing instance. instance_id=fd4ae28bc993236e:27343be100000006 fragment_idx=1 per_fragment_instance_idx=2 coord_state_idx=1 #in-flight=1 I0813 12:10:50.250804 31259 query-state.cc:477] Executing instance. instance_id=fd4ae28bc993236e:27343be100000003 fragment_idx=2 per_fragment_instance_idx=2 coord_state_idx=1 #in-flight=2 I0813 12:10:50.374167 31259 query-state.cc:485] Instance completed. instance_id=fd4ae28bc993236e:27343be100000003 #in-flight=1 status=OK I0813 12:10:50.375370 31269 krpc-data-stream-mgr.cc:294] DeregisterRecvr(): fragment_instance_id=fd4ae28bc993236e:27343be100000006, node=3 I0813 12:10:50.417552 31256 query-state.cc:485] Instance completed. instance_id=fd4ae28bc993236e:27343be100000006 #in-flight=0 status=OK I0813 12:10:50.418007 31256 query-exec-mgr.cc:179] ReleaseQueryState(): deleted query_id=fd4ae28bc993236e:27343be100000000 Change-Id: I6c1db44acc6def2b05a4fd032c63716e08cdf5ff Reviewed-on: http://gerrit.cloudera.org:8080/11202 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
6e5ec22b12
commit
df1865856a
@@ -182,7 +182,7 @@ bool ScanNode::WaitForRuntimeFilters() {
|
||||
if (arrived_filter_ids.size() == filter_ctxs_.size()) {
|
||||
runtime_profile()->AddInfoString("Runtime filters",
|
||||
Substitute("All filters arrived. Waited $0", wait_time));
|
||||
VLOG_QUERY << "Filters arrived. Waited " << wait_time;
|
||||
VLOG(2) << "Filters arrived. Waited " << wait_time;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ bool ScanNode::WaitForRuntimeFilters() {
|
||||
"Not all filters arrived (arrived: [$0], missing [$1]), waited for $2",
|
||||
join(arrived_filter_ids, ", "), join(missing_filter_ids, ", "), wait_time);
|
||||
runtime_profile()->AddInfoString("Runtime filters", filter_str);
|
||||
VLOG_QUERY << filter_str;
|
||||
VLOG(2) << filter_str;
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -248,9 +248,9 @@ void ScanNode::ScannerThreadState::Open(
|
||||
// the producer/consumer.
|
||||
max_row_batches = max(2, max_row_batches / state->query_options().mt_dop);
|
||||
}
|
||||
VLOG_QUERY << "Max row batch queue size for scan node '" << parent->id()
|
||||
<< "' in fragment instance '" << PrintId(state->fragment_instance_id())
|
||||
<< "': " << max_row_batches;
|
||||
VLOG(2) << "Max row batch queue size for scan node '" << parent->id()
|
||||
<< "' in fragment instance '" << PrintId(state->fragment_instance_id())
|
||||
<< "': " << max_row_batches;
|
||||
batch_queue_.reset(
|
||||
new RowBatchQueue(max_row_batches, FLAGS_max_queued_row_batch_bytes));
|
||||
|
||||
|
||||
@@ -60,9 +60,9 @@ Status InitialReservations::Init(
|
||||
PrettyPrinter::Print(query_min_reservation, TUnit::BYTES), FLAGS_hostname,
|
||||
FLAGS_be_port, PrintId(query_id), reservation_status.GetDetail());
|
||||
}
|
||||
VLOG_QUERY << "Successfully claimed initial reservations ("
|
||||
<< PrettyPrinter::Print(query_min_reservation, TUnit::BYTES) << ") for"
|
||||
<< " query " << PrintId(query_id);
|
||||
VLOG(2) << "Successfully claimed initial reservations ("
|
||||
<< PrettyPrinter::Print(query_min_reservation, TUnit::BYTES) << ") for"
|
||||
<< " query " << PrintId(query_id);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@@ -555,9 +555,9 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
|
||||
DequeueDeferredRpc();
|
||||
}
|
||||
}
|
||||
VLOG_QUERY << "cancelled stream: fragment_instance_id="
|
||||
<< PrintId(recvr_->fragment_instance_id())
|
||||
<< " node_id=" << recvr_->dest_node_id();
|
||||
VLOG(2) << "cancelled stream: fragment_instance_id="
|
||||
<< PrintId(recvr_->fragment_instance_id())
|
||||
<< " node_id=" << recvr_->dest_node_id();
|
||||
// Wake up all threads waiting to produce/consume batches. They will all
|
||||
// notice that the stream is cancelled and handle it.
|
||||
data_arrival_cv_.notify_all();
|
||||
|
||||
@@ -220,8 +220,8 @@ MemTracker* MemTracker::CreateQueryMemTracker(const TUniqueId& id,
|
||||
<< " exceeds physical memory of "
|
||||
<< PrettyPrinter::Print(MemInfo::physical_mem(), TUnit::BYTES);
|
||||
}
|
||||
VLOG_QUERY << "Using query memory limit: "
|
||||
<< PrettyPrinter::Print(byte_limit, TUnit::BYTES);
|
||||
VLOG(2) << "Using query memory limit: "
|
||||
<< PrettyPrinter::Print(byte_limit, TUnit::BYTES);
|
||||
}
|
||||
|
||||
MemTracker* pool_tracker =
|
||||
|
||||
@@ -43,8 +43,8 @@ DEFINE_int32(log_mem_usage_interval, 0, "If non-zero, impalad will output memory
|
||||
|
||||
Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
|
||||
TUniqueId query_id = params.query_ctx.query_id;
|
||||
VLOG_QUERY << "StartQueryFInstances() query_id=" << PrintId(query_id)
|
||||
<< " coord=" << TNetworkAddressToString(params.query_ctx.coord_address);
|
||||
VLOG(2) << "StartQueryFInstances() query_id=" << PrintId(query_id)
|
||||
<< " coord=" << TNetworkAddressToString(params.query_ctx.coord_address);
|
||||
|
||||
bool dummy;
|
||||
QueryState* qs = GetOrCreateQueryState(params.query_ctx, &dummy);
|
||||
@@ -152,8 +152,8 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
|
||||
// don't reference anything from 'qs' beyond this point, 'qs' might get
|
||||
// gc'd out from under us
|
||||
qs = nullptr;
|
||||
VLOG_QUERY << "ReleaseQueryState(): query_id=" << PrintId(query_id)
|
||||
<< " refcnt=" << cnt + 1;
|
||||
VLOG(2) << "ReleaseQueryState(): query_id=" << PrintId(query_id)
|
||||
<< " refcnt=" << cnt + 1;
|
||||
DCHECK_GE(cnt, 0);
|
||||
if (cnt > 0) return;
|
||||
|
||||
@@ -176,4 +176,5 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
|
||||
}
|
||||
// TODO: send final status report during gc, but do this from a different thread
|
||||
delete qs_from_map;
|
||||
VLOG(1) << "ReleaseQueryState(): deleted query_id=" << PrintId(query_id);
|
||||
}
|
||||
|
||||
@@ -160,8 +160,8 @@ void QueryState::InitMemTrackers() {
|
||||
int64_t bytes_limit = -1;
|
||||
if (query_options().__isset.mem_limit && query_options().mem_limit > 0) {
|
||||
bytes_limit = query_options().mem_limit;
|
||||
VLOG_QUERY << "Using query memory limit from query options: "
|
||||
<< PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
|
||||
VLOG(2) << "Using query memory limit from query options: "
|
||||
<< PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
|
||||
}
|
||||
query_mem_tracker_ =
|
||||
MemTracker::CreateQueryMemTracker(query_id(), query_options(), pool, &obj_pool_);
|
||||
@@ -182,7 +182,7 @@ Status QueryState::InitBufferPoolState() {
|
||||
DCHECK_GE(mem_limit, 0);
|
||||
max_reservation = ReservationUtil::GetReservationLimitFromMemLimit(mem_limit);
|
||||
}
|
||||
VLOG_QUERY << "Buffer pool limit for " << PrintId(query_id()) << ": " << max_reservation;
|
||||
VLOG(2) << "Buffer pool limit for " << PrintId(query_id()) << ": " << max_reservation;
|
||||
|
||||
buffer_reservation_ = obj_pool_.Add(new ReservationTracker);
|
||||
buffer_reservation_->InitChildTracker(
|
||||
@@ -353,8 +353,8 @@ Status QueryState::WaitForFinish() {
|
||||
}
|
||||
|
||||
void QueryState::StartFInstances() {
|
||||
VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id())
|
||||
<< " #instances=" << rpc_params_.fragment_instance_ctxs.size();
|
||||
VLOG(2) << "StartFInstances(): query_id=" << PrintId(query_id())
|
||||
<< " #instances=" << rpc_params_.fragment_instance_ctxs.size();
|
||||
DCHECK_GT(refcnt_.Load(), 0);
|
||||
DCHECK_GT(exec_resource_refcnt_.Load(), 0) << "Should have been taken in Init()";
|
||||
|
||||
@@ -371,8 +371,8 @@ void QueryState::StartFInstances() {
|
||||
ReportExecStatusAux(true, status, nullptr, false);
|
||||
return;
|
||||
}
|
||||
VLOG_QUERY << "descriptor table for query=" << PrintId(query_id())
|
||||
<< "\n" << desc_tbl_->DebugString();
|
||||
VLOG(2) << "descriptor table for query=" << PrintId(query_id())
|
||||
<< "\n" << desc_tbl_->DebugString();
|
||||
|
||||
Status thread_create_status;
|
||||
DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
|
||||
|
||||
@@ -84,7 +84,7 @@ RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filte
|
||||
if (consumed_filters_.find(filter_desc.filter_id) == consumed_filters_.end()) {
|
||||
ret = obj_pool_.Add(new RuntimeFilter(filter_desc, filter_desc.filter_size_bytes));
|
||||
consumed_filters_[filter_desc.filter_id] = ret;
|
||||
VLOG_QUERY << "registered consumer filter " << filter_desc.filter_id;
|
||||
VLOG(2) << "registered consumer filter " << filter_desc.filter_id;
|
||||
} else {
|
||||
// The filter has already been registered in this filter bank by another
|
||||
// target node.
|
||||
|
||||
@@ -41,14 +41,21 @@ ImpalaInternalService::ImpalaInternalService() {
|
||||
|
||||
void ImpalaInternalService::ExecQueryFInstances(TExecQueryFInstancesResult& return_val,
|
||||
const TExecQueryFInstancesParams& params) {
|
||||
VLOG_QUERY << "ExecQueryFInstances():" << " query_id=" <<
|
||||
PrintId(params.query_ctx.query_id);
|
||||
FAULT_INJECTION_RPC_DELAY(RPC_EXECQUERYFINSTANCES);
|
||||
DCHECK(params.__isset.coord_state_idx);
|
||||
DCHECK(params.__isset.query_ctx);
|
||||
DCHECK(params.__isset.fragment_ctxs);
|
||||
DCHECK(params.__isset.fragment_instance_ctxs);
|
||||
query_exec_mgr_->StartQuery(params).SetTStatus(&return_val);
|
||||
VLOG_QUERY << "ExecQueryFInstances():" << " query_id="
|
||||
<< PrintId(params.query_ctx.query_id)
|
||||
<< " coord=" << TNetworkAddressToString(params.query_ctx.coord_address)
|
||||
<< " #instances=" << params.fragment_instance_ctxs.size();
|
||||
Status status = query_exec_mgr_->StartQuery(params);
|
||||
status.SetTStatus(&return_val);
|
||||
if (!status.ok()) {
|
||||
LOG(INFO) << "ExecQueryFInstances() failed: query_id="
|
||||
<< PrintId(params.query_ctx.query_id) << ": " << status.GetDetail();
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T> void SetUnknownIdError(
|
||||
|
||||
Reference in New Issue
Block a user