diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 3dc0e5438..20c7368e8 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -584,7 +584,13 @@ void HdfsScanNode::Close(RuntimeState* state) { DCHECK_EQ(num_owned_io_buffers_, 0) << "ScanNode has leaked io buffers"; if (reader_context_ != NULL) { - state->io_mgr()->UnregisterReader(reader_context_); + // There may still be io buffers used by parent nodes so we can't unregister the + // reader context yet. The runtime state keeps a list of all the reader contexts and + // they are unregistered when the fragment is closed. + state->reader_contexts()->push_back(reader_context_); + // Need to wait for all the active scanner threads to finish to ensure there is no + // more memory tracked by this scan node's mem tracker. + state->io_mgr()->WaitForDisksCompletion(reader_context_); } StopAndFinalizeCounters(); diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 950a2cab2..82f95022b 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -315,7 +315,7 @@ Status DiskIoMgr::RegisterReader(hdfsFS hdfs, ReaderContext** reader, return Status::OK; } -void DiskIoMgr::UnregisterReader(ReaderContext* reader) { +void DiskIoMgr::WaitForDisksCompletion(ReaderContext* reader) { // First cancel the reader. This is more or less a no-op if the reader is // complete (common case). reader->Cancel(Status::CANCELLED); @@ -325,8 +325,13 @@ void DiskIoMgr::UnregisterReader(ReaderContext* reader) { while (reader->num_disks_with_ranges_ > 0) { reader->disks_complete_cond_var_.wait(reader_lock); } +} + +void DiskIoMgr::UnregisterReader(ReaderContext* reader) { + WaitForDisksCompletion(reader); // All the disks are done with clean, validate nothing is leaking. + unique_lock reader_lock(reader->lock_); DCHECK_EQ(reader->num_buffers_in_reader_, 0) << endl << reader->DebugString(); DCHECK_EQ(reader->num_used_buffers_, 0) << endl << reader->DebugString(); diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h index d489a6ff7..6beeb062d 100644 --- a/be/src/runtime/disk-io-mgr.h +++ b/be/src/runtime/disk-io-mgr.h @@ -397,6 +397,12 @@ class DiskIoMgr { // UnregisterReader also cancels the reader from the disk IoMgr. void UnregisterReader(ReaderContext* reader); + // Cancels the reader and waits for the number of active disks for this reader to reach + // 0. This call blocks until all the disk threads have finished cleaning up the reader. + // After calling, the only valid API is returning IO buffers that have already been + // returned. Takes reader->lock_. + void WaitForDisksCompletion(ReaderContext* reader); + // This function cancels the reader asychronously. All outstanding requests // are aborted and tracking structures cleaned up. This does not need to be // called if the reader finishes normally. diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc index f75c54a2d..13f5f7e9f 100644 --- a/be/src/runtime/plan-fragment-executor.cc +++ b/be/src/runtime/plan-fragment-executor.cc @@ -525,10 +525,11 @@ void PlanFragmentExecutor::Close() { exec_env_->cgroups_mgr()->UnregisterFragment( runtime_state_->fragment_instance_id(), runtime_state_->cgroup()); } - if (plan_ != NULL) - plan_->Close(runtime_state_.get()); - if (sink_.get() != NULL) - sink_->Close(runtime_state()); + if (plan_ != NULL) plan_->Close(runtime_state_.get()); + if (sink_.get() != NULL) sink_->Close(runtime_state()); + BOOST_FOREACH(DiskIoMgr::ReaderContext* reader, *runtime_state_->reader_contexts()) { + runtime_state_->io_mgr()->UnregisterReader(reader); + } exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool()); } if (mem_usage_sampled_counter_ != NULL) { diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index a9e2f7bcc..a08526ae1 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -23,7 +23,6 @@ #include "common/status.h" #include "exprs/expr.h" #include "runtime/descriptors.h" -#include "runtime/disk-io-mgr.h" #include "runtime/runtime-state.h" #include "runtime/timestamp-value.h" #include "runtime/data-stream-recvr.h" diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index cd7d687e2..e88716854 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -30,6 +30,7 @@ #include "statestore/query-resource-mgr.h" #include "runtime/exec-env.h" #include "runtime/descriptors.h" // for PlanNodeId +#include "runtime/disk-io-mgr.h" // for DiskIoMgr::ReaderContext #include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" #include "runtime/thread-resource-mgr.h" @@ -41,7 +42,6 @@ namespace impala { class DescriptorTbl; -class DiskIoMgr; class ObjectPool; class Status; class ExecEnv; @@ -128,6 +128,7 @@ class RuntimeState { FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; } PartitionRowCount* num_appended_rows() { return &num_appended_rows_; } PartitionInsertStats* insert_stats() { return &insert_stats_; } + std::vector* reader_contexts() { return &reader_contexts_; } // Returns runtime state profile RuntimeProfile* runtime_profile() { return &profile_; } @@ -332,6 +333,9 @@ class RuntimeState { // ResourceBroker instead. QueryResourceMgr* query_resource_mgr_; + // Reader contexts that need to be closed when the fragment is closed. + std::vector reader_contexts_; + // prohibit copies RuntimeState(const RuntimeState&); }; diff --git a/testdata/workloads/functional-query/queries/QueryTest/union.test b/testdata/workloads/functional-query/queries/QueryTest/union.test index 9e8f3aa86..10769c257 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/union.test +++ b/testdata/workloads/functional-query/queries/QueryTest/union.test @@ -795,3 +795,13 @@ int, bigint 1,1 2,3 ==== +---- QUERY +# IMPALA-843: Impalads crash while running join on string column + union +select count(*) from +(select 1 FROM alltypes AS t1 JOIN alltypestiny AS t2 ON t1.string_col = t2.string_col +UNION ALL SELECT 1 FROM tinytable AS t1) as t3 +---- TYPES +bigint +---- RESULTS +5843 +====