Fixing bugs in PlanFragmentExecutor:

- the report callback is now *always* invoked when execution finishes,
  regardless of --status_report_interval setting
- except that no final report is sent if the execution was cancelled
- the report thread is stopped when execution finishes in Open() or GetNext()
  (ie, not in the d'tor)
This commit is contained in:
Marcel Kornacker
2012-09-24 22:44:39 -07:00
committed by Henry Robinson
parent a55e7777ef
commit f58db29942
3 changed files with 64 additions and 29 deletions

View File

@@ -607,7 +607,8 @@ void Coordinator::CancelInternal() {
TCancelPlanFragmentResult res;
try {
VLOG_QUERY << "sending CancelPlanFragment rpc for fragment_id="
<< exec_state->fragment_id;
<< exec_state->fragment_id << " backend="
<< exec_state->hostport.first << ":" << exec_state->hostport.second;
backend_client->CancelPlanFragment(res, params);
} catch (TTransportException& e) {
stringstream msg;

View File

@@ -56,7 +56,8 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
sink_->Close(runtime_state());
}
}
StopReportThread();
// at this point, the report thread should have been stopped
DCHECK(!report_thread_active_);
}
Status PlanFragmentExecutor::Prepare(
@@ -152,7 +153,9 @@ Status PlanFragmentExecutor::Open() {
VLOG_QUERY << "Open(): fragment_id=" << runtime_state_->fragment_id();
// we need to start the profile-reporting thread before calling Open(), since it
// may block
if (!report_status_cb_.empty() && FLAGS_status_report_interval >= 0) {
// TODO: if no report thread is started, make sure to send a final profile
// at end, otherwise the coordinator hangs in case we finish w/ an error
if (!report_status_cb_.empty() && FLAGS_status_report_interval > 0) {
unique_lock<mutex> l(report_thread_lock_);
report_thread_ = thread(&PlanFragmentExecutor::ReportProfile, this);
// make sure the thread started up, otherwise ReportProfile() might get into a race
@@ -203,7 +206,10 @@ Status PlanFragmentExecutor::OpenInternal() {
// Setting to NULL ensures that the d'tor won't double-close the sink.
sink_.reset(NULL);
done_ = true;
StopReportThread();
SendReport(true);
return Status::OK;
}
@@ -227,21 +233,32 @@ void PlanFragmentExecutor::ReportProfile() {
VLOG_QUERY << ss.str();
}
Status status;
{
lock_guard<mutex> l2(status_lock_);
status = status_;
}
// notified means someone notified on stop_report_thread_cv_
report_status_cb_(status, profile(), notified || !status.ok());
if (notified || !status.ok()) {
if (notified) {
VLOG_QUERY << "exiting reporting thread: fragment_id="
<< runtime_state_->fragment_id();
return;
} else {
SendReport(false);
}
}
}
void PlanFragmentExecutor::SendReport(bool done) {
if (report_status_cb_.empty()) return;
Status status;
{
lock_guard<mutex> l(status_lock_);
status = status_;
}
// don't send a final report if we got cancelled, nobody's going to look at it
// anyway
if (!status.IsCancelled()) {
// notified means someone notified on stop_report_thread_cv_
report_status_cb_(status, profile(), done || !status.ok());
}
}
void PlanFragmentExecutor::StopReportThread() {
if (!report_thread_active_) return;
{
@@ -262,6 +279,8 @@ Status PlanFragmentExecutor::GetNext(RowBatch** batch) {
if (done_) {
VLOG_QUERY << "Finished executing fragment query_id=" << PrintId(query_id_)
<< " fragment_id=" << PrintId(runtime_state_->fragment_id());
StopReportThread();
SendReport(true);
}
return status;
}
@@ -290,6 +309,8 @@ void PlanFragmentExecutor::UpdateStatus(const Status& status) {
if (status.ok()) return;
lock_guard<mutex> l(status_lock_);
if (status_.ok()) status_ = status;
StopReportThread();
SendReport(true);
}
void PlanFragmentExecutor::Cancel() {

View File

@@ -33,10 +33,14 @@ class TPlanExecParams;
// The executor makes an aggregated profile for the entire fragment available,
// which includes profile information for the plan itself as well as the output
// sink, if any.
// The ReportStatusCallback passed into the c'tor is invoked periodically (controlled
// by the flag status_report_interval) to report the execution status. It is guaranteed
// to be called at least once at the end of execution with an overall status and profile
// (and 'done' indicator).
// The ReportStatusCallback passed into the c'tor is invoked periodically to report the
// execution status. The frequency of those reports is controlled by the flag
// status_report_interval; setting that flag to 0 disables periodic reporting altogether
// Regardless of the value of that flag, if a report callback is specified, it is
// invoked at least once at the end of execution with an overall status and profile
// (and 'done' indicator). The only exception is when execution is cancelled, in which
// case the callback is *not* invoked (the coordinator already knows that execution
// stopped, because it initiated the cancellation).
//
// Aside from Cancel(), which may be called asynchronously, this class is not
// thread-safe.
@@ -57,7 +61,10 @@ class PlanFragmentExecutor {
PlanFragmentExecutor(ExecEnv* exec_env, const ReportStatusCallback& report_status_cb);
// Closes the underlying plan fragment and frees up all resources allocated
// in Open()/GetNext(). Tells report_thread_ to stop and blocks until it has finished.
// in Open()/GetNext().
// It is an error to delete a PlanFragmentExecutor with a report callback
// before Open()/GetNext() (depending on whether the fragment has a sink)
// indicated that execution is finished.
~PlanFragmentExecutor();
// Prepare for execution. Call this prior to Open().
@@ -68,16 +75,19 @@ class PlanFragmentExecutor {
// Start execution. Call this prior to GetNext().
// If this fragment has a sink, Open() will send all rows produced
// by the fragment to that sink. Therefore, Open() may block until
// all rows are produced.
// all rows are produced (and a subsequent call to GetNext() will not return
// any rows).
// This also starts the status-reporting thread, if the interval flag
// is > 0 and a callback was specified in the c'tor.
// If this fragment has a sink, the report thread will have invoked
// report_status_cb for the final time when Open() returns.
// If this fragment has a sink, report_status_cb will have been called for the final
// time when Open() returns, and the status-reporting thread will have been stopped.
Status Open();
// Return results through 'batch'. Sets '*batch' to NULL if no more results.
// '*batch' is owned by PlanFragmentExecutor and must not be deleted.
// When *batch == NULL, GetNext() should not be called anymore.
// When *batch == NULL, GetNext() should not be called anymore. Also, report_status_cb
// will have been called for the final time and the status-reporting thread
// will have been stopped.
Status GetNext(RowBatch** batch);
// Initiate cancellation. Must not be called until after Prepare() returned.
@@ -138,25 +148,28 @@ class PlanFragmentExecutor {
ObjectPool* obj_pool() { return runtime_state_->obj_pool(); }
// Main loop of profile reporting thread.
// Exits if:
// - notified on done_cv_
// - !status_.ok() at profile reporting time
// Exits when notified on done_cv_.
// On exit, *no report is sent*, ie, this will not send the final report.
void ReportProfile();
// Invoked the report callback if there is a report callback and the current
// status isn't CANCELLED. Sets 'done' to true in the callback invocation if
// done == true or we have an error status.
void SendReport(bool done);
void Close();
// If status_.ok(), sets status_ to status.
// If we're transitioning to an error status, stops report thread and
// sends a final report.
void UpdateStatus(const Status& status);
// Executes Open() logic and returns resulting status. Does not set status_.
// If this plan fragment has no sink, OpenInternal() does nothing.
// If this plan fragment has a sink and OpenInternal() returns without an
// error condition all rows will have been sent to the sink, the sink will
// have been closed and the report thread will have been stopped, ensuring a
// last status report will have been sent to the coordinator. sink_ will be
// set to NULL after successful execution.
// In the error case, the destructor will clean up the report thread and the
// sink.
// error condition, all rows will have been sent to the sink, the sink will
// have been closed, a final report will have been sent and the report thread will
// have been stopped. sink_ will be set to NULL after successful execution.
Status OpenInternal();
// Executes GetNext() logic and returns resulting status.