From 80c1d2dbaabc78bdf1b6e4da5475bfa365cd375e Mon Sep 17 00:00:00 2001 From: noemi Date: Wed, 6 Apr 2022 16:36:27 +0200 Subject: [PATCH] IMPALA-4530: Implement in-memory merge of quicksorted runs This change aims to decrease back-pressure in the sorter. It offers an alternative for the in-memory run formation strategy and sorting algorithm by introducing a new in-memory merge level between the in-memory quicksort and the external merge phase. Instead of forming one big run, it produces many smaller in-memory runs (called miniruns), sorts those with quicksort, then merges them in memory, before spilling or serving GetNext(). The external merge phase remains the same. Works with MAX_SORT_RUN_SIZE development query option that determines the maximum number of pages in a 'minirun'. The default value of MAX_SORT_RUN_SIZE is 0, which keeps the original implementation of 1 big initial in-memory run. Other options are integers of 2 and above. The recommended value is 10 or more, to avoid high fragmentation in case of large workloads and variable length data. Testing: - added MAX_SORT_RUN_SIZE as an additional test dimension to test_sort.py with values [0, 2, 20] - additional partial sort test case (inserting into partitioned kudu table) - manual E2E testing Change-Id: I58c0ae112e279b93426752895ded7b1a3791865c Reviewed-on: http://gerrit.cloudera.org:8080/18393 Reviewed-by: Impala Public Jenkins Reviewed-by: Csaba Ringhofer Tested-by: Csaba Ringhofer --- be/src/exec/partial-sort-node.cc | 8 +- be/src/exec/partial-sort-node.h | 3 + be/src/runtime/sorter-internal.h | 40 ++++- be/src/runtime/sorter.cc | 272 ++++++++++++++++++++++++++--- be/src/runtime/sorter.h | 56 ++++-- be/src/service/query-options.cc | 9 + be/src/service/query-options.h | 5 +- be/src/util/tuple-row-compare.h | 22 ++- bin/perf_tools/perf-query.sh | 2 +- common/thrift/ImpalaService.thrift | 10 ++ common/thrift/Query.thrift | 5 + tests/query_test/test_sort.py | 82 +++++++-- 12 files changed, 452 insertions(+), 62 deletions(-) diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc index 1fadbe523..5919bb46a 100644 --- a/be/src/exec/partial-sort-node.cc +++ b/be/src/exec/partial-sort-node.cc @@ -66,6 +66,7 @@ PartialSortNode::PartialSortNode( input_eos_(false), sorter_eos_(true) { runtime_profile()->AddInfoString("SortType", "Partial"); + child_get_next_timer_ = ADD_SUMMARY_STATS_TIMER(runtime_profile(), "ChildGetNextTime"); } PartialSortNode::~PartialSortNode() { @@ -143,7 +144,12 @@ Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* if (input_batch_index_ == input_batch_->num_rows()) { input_batch_->Reset(); input_batch_index_ = 0; - RETURN_IF_ERROR(child(0)->GetNext(state, input_batch_.get(), &input_eos_)); + MonotonicStopWatch timer; + timer.Start(); + Status status = child(0)->GetNext(state, input_batch_.get(), &input_eos_); + timer.Stop(); + RETURN_IF_ERROR(status); + child_get_next_timer_->UpdateCounter(timer.ElapsedTime()); } int num_processed; diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h index a02bb2fca..0f1ebe456 100644 --- a/be/src/exec/partial-sort-node.h +++ b/be/src/exec/partial-sort-node.h @@ -86,6 +86,9 @@ class PartialSortNode : public ExecNode { const TupleRowComparatorConfig& tuple_row_comparator_config_; + /// Min, max, and avg time spent in calling GetNext on child + RuntimeProfile::SummaryStatsCounter* child_get_next_timer_; + ///////////////////////////////////////// /// BEGIN: Members that must be Reset() diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h index 7661051ba..a974af456 100644 --- a/be/src/runtime/sorter-internal.h +++ b/be/src/runtime/sorter-internal.h @@ -114,13 +114,22 @@ class Sorter::Page { /// /// Runs are either "initial runs" constructed from the sorter's input by evaluating /// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed -/// by merging already-sorted runs. Initial runs are sorted in-place in memory. Once -/// sorted, runs can be spilled to disk to free up memory. Sorted runs are merged by +/// by merging already-sorted runs. Initial runs are sorted in-place in memory. +/// Once sorted, runs can be spilled to disk to free up memory. Sorted runs are merged by /// SortedRunMerger, either to produce the final sorted output or to produce another /// sorted run. +/// By default, the size of initial runs is determined by the available memory: the +/// sorter tries to add batches to the run until some (memory) limit is reached. +/// Some query options can also limit the size of an initial (or in-memory) run. +/// SORT_RUN_BYTES_LIMIT triggers spilling after the size of data in the run exceeds the +/// given threshold (usually expressed in MB or GB). +/// MAX_SORT_RUN_SIZE allows constructing runs up to a certain size by limiting the +/// number of pages in the initial runs. These smaller in-memory runs are also referred +/// to as 'miniruns'. Miniruns are not spilled immediately, but sorted in-place first, +/// and collected to be merged in memory before spilling the produced output run to disk. /// /// The expected calling sequence of functions is as follows: -/// * Init() to initialize the run and allocate initial pages. +/// * Init() or TryInit() to initialize the run and allocate initial pages. /// * Add*Batch() to add batches of tuples to the run. /// * FinalizeInput() to signal that no more batches will be added. /// * If the run is unsorted, it must be sorted. After that set_sorted() must be called. @@ -141,13 +150,23 @@ class Sorter::Run { /// var-len data into var_len_copy_page_. Status Init(); + /// Similar to Init(), except for the following differences: + /// It is only used to initialize miniruns (query option MAX_SORT_RUN_SIZE > 0 cases). + /// The first in-memory run is always initialized by calling Init(), because that must + /// succeed. The following ones are initialized by TryInit(). + /// TryInit() allocates one fixed-len page and one var-len page if 'has_var_len_slots_' + /// is true. There is no need for var_len_copy_page here. Returns false if + /// initialization was successful, returns true, if reservation was not enough. + Status TryInit(bool* allocation_failed); + /// Add the rows from 'batch' starting at 'start_index' to the current run. Returns /// the number of rows actually added in 'num_processed'. If the run is full (no more /// pages can be allocated), 'num_processed' may be less than the number of remaining /// rows in the batch. AddInputBatch() materializes the input rows using the /// expressions in sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just /// copies rows. - Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed); + Status AddInputBatch( + RowBatch* batch, int start_index, int* num_processed, bool* allocation_failed); Status AddIntermediateBatch(RowBatch* batch, int start_index, int* num_processed); @@ -199,6 +218,9 @@ class Sorter::Run { bool is_finalized() const { return is_finalized_; } bool is_sorted() const { return is_sorted_; } void set_sorted() { is_sorted_ = true; } + int max_num_of_pages() const { return max_num_of_pages_; } + int fixed_len_size() { return fixed_len_pages_.size(); } + int run_size() { return fixed_len_pages_.size() + var_len_pages_.size(); } int64_t num_tuples() const { return num_tuples_; } /// Returns true if we have var-len pages in the run. bool HasVarLenPages() const { @@ -215,8 +237,8 @@ class Sorter::Run { /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance and must /// match 'initial_run_' and 'has_var_len_slots_'. template - Status AddBatchInternal( - RowBatch* batch, int start_index, int* num_processed); + Status AddBatchInternal(RowBatch* batch, int start_index, int* num_processed, + bool* allocation_failed); /// Finalize the list of pages: delete empty final pages and unpin the previous page /// if the run is unpinned. @@ -352,6 +374,12 @@ class Sorter::Run { /// Used to implement GetNextBatch() interface required for the merger. boost::scoped_ptr buffered_batch_; + /// Max number of fixed-len + var-len pages in an in-memory minirun. It defines the + /// length of a minirun. + /// The default value is 0 which means that only 1 in-memory run will be created, and + /// its size will be determined by other limits eg. memory or sort_run_bytes_limit. + int max_num_of_pages_; + /// Members used when a run is read in GetNext(). /// The index into 'fixed_' and 'var_len_pages_' of the pages being read in GetNext(). int fixed_len_pages_index_; diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index ae885252b..93a4cb908 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -121,11 +121,12 @@ Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_ is_pinned_(initial_run), is_finalized_(false), is_sorted_(!initial_run), - num_tuples_(0) {} + num_tuples_(0), + max_num_of_pages_(initial_run ? parent->inmem_run_max_pages_ : 0) {} Status Sorter::Run::Init() { - int num_to_create = 1 + has_var_len_slots_ - + (has_var_len_slots_ && initial_run_ && sorter_->enable_spilling_); + int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ && initial_run_ && + (sorter_->enable_spilling_ && max_num_of_pages_ == 0)); int64_t required_mem = num_to_create * sorter_->page_len_; if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) { return Status(Substitute( @@ -152,9 +153,32 @@ Status Sorter::Run::Init() { return Status::OK(); } +Status Sorter::Run::TryInit(bool* allocation_failed) { + *allocation_failed = true; + DCHECK_GT(sorter_->inmem_run_max_pages_, 0); + // No need for additional copy page because var-len data is not reordered. + // The in-memory merger can copy var-len data directly from the in-memory runs, + // which are kept until the merge is finished + int num_to_create = 1 + has_var_len_slots_; + int64_t required_mem = num_to_create * sorter_->page_len_; + if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) { + return Status::OK(); + } + + RETURN_IF_ERROR(AddPage(&fixed_len_pages_)); + if (has_var_len_slots_) { + RETURN_IF_ERROR(AddPage(&var_len_pages_)); + } + if (initial_run_) { + sorter_->initial_runs_counter_->Add(1); + } + *allocation_failed = false; + return Status::OK(); +} + template Status Sorter::Run::AddBatchInternal( - RowBatch* batch, int start_index, int* num_processed) { + RowBatch* batch, int start_index, int* num_processed, bool* allocation_failed) { DCHECK(!is_finalized_); DCHECK(!fixed_len_pages_.empty()); DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_); @@ -226,6 +250,7 @@ Status Sorter::Run::AddBatchInternal( // There was not enough space in the last var-len page for this tuple, and // the run could not be extended. Return the fixed-len allocation and exit. cur_fixed_len_page->FreeBytes(sort_tuple_size_); + *allocation_failed = true; return Status::OK(); } } @@ -246,13 +271,21 @@ Status Sorter::Run::AddBatchInternal( // If there are still rows left to process, get a new page for the fixed-length // tuples. If the run is already too long, return. + if (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() >= max_num_of_pages_){ + *allocation_failed = false; + return Status::OK(); + } if (cur_input_index < batch->num_rows()) { bool added; RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added)); - if (!added) return Status::OK(); + if (!added) { + *allocation_failed = true; + return Status::OK(); + } cur_fixed_len_page = &fixed_len_pages_.back(); } } + *allocation_failed = false; return Status::OK(); } @@ -773,22 +806,28 @@ int64_t Sorter::Run::TotalBytes() const { return total_bytes; } -Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* num_processed) { +Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* num_processed, + bool* allocation_failed) { DCHECK(initial_run_); if (has_var_len_slots_) { - return AddBatchInternal(batch, start_index, num_processed); + return AddBatchInternal( + batch, start_index, num_processed, allocation_failed); } else { - return AddBatchInternal(batch, start_index, num_processed); + return AddBatchInternal( + batch, start_index, num_processed, allocation_failed); } } Status Sorter::Run::AddIntermediateBatch( RowBatch* batch, int start_index, int* num_processed) { DCHECK(!initial_run_); + bool allocation_failed = false; if (has_var_len_slots_) { - return AddBatchInternal(batch, start_index, num_processed); + return AddBatchInternal( + batch, start_index, num_processed, &allocation_failed); } else { - return AddBatchInternal(batch, start_index, num_processed); + return AddBatchInternal( + batch, start_index, num_processed, &allocation_failed); } } @@ -909,8 +948,10 @@ Sorter::Sorter(const TupleRowComparatorConfig& tuple_row_comparator_config, initial_runs_counter_(nullptr), num_merges_counter_(nullptr), in_mem_sort_timer_(nullptr), + in_mem_merge_timer_(nullptr), sorted_data_size_(nullptr), - run_sizes_(nullptr) { + run_sizes_(nullptr), + inmem_run_max_pages_(state->query_options().max_sort_run_size) { switch (tuple_row_comparator_config.sorting_order_) { case TSortingOrder::LEXICAL: compare_less_than_.reset( @@ -922,13 +963,13 @@ Sorter::Sorter(const TupleRowComparatorConfig& tuple_row_comparator_config, default: DCHECK(false); } - if (estimated_input_size > 0) ComputeSpillEstimate(estimated_input_size); } Sorter::~Sorter() { DCHECK(sorted_runs_.empty()); DCHECK(merging_runs_.empty()); + DCHECK(sorted_inmem_runs_.empty()); DCHECK(unsorted_run_ == nullptr); DCHECK(merge_output_run_ == nullptr); } @@ -977,6 +1018,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool) { initial_runs_counter_ = ADD_COUNTER(profile_, "RunsCreated", TUnit::UNIT); } in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime"); + in_mem_merge_timer_ = ADD_TIMER(profile_, "InMemoryMergeTime"); sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES); run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", TUnit::UNIT); @@ -1016,12 +1058,23 @@ Status Sorter::AddBatch(RowBatch* batch) { RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed)); cur_batch_index += num_processed; + if (MustSortAndSpill(cur_batch_index, batch->num_rows())) { - // The current run is full. Sort it, spill it and begin the next one. - int64_t unsorted_run_bytes = unsorted_run_->TotalBytes(); RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_)); - RETURN_IF_ERROR(SortCurrentInputRun()); - RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages()); + int64_t unsorted_run_bytes = unsorted_run_->TotalBytes(); + + if (inmem_run_max_pages_ == 0) { + // The current run is full. Sort it and spill it. + RETURN_IF_ERROR(SortCurrentInputRun()); + sorted_runs_.push_back(unsorted_run_); + unsorted_run_ = nullptr; + RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages()); + } else { + // The memory is full with miniruns. Sort, merge and spill them. + RETURN_IF_ERROR(MergeAndSpill()); + } + + // After we freed memory by spilling, initialize the next run. unsorted_run_ = run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true)); RETURN_IF_ERROR(unsorted_run_->Init()); @@ -1058,6 +1111,70 @@ int64_t Sorter::GetSortRunBytesLimit() const { } } +Status Sorter::InitializeNewMinirun(bool* allocation_failed) { + // The minirun reached its size limit (max_num_of_pages). + // We should sort the run, append to the sorted miniruns, and start a new minirun. + DCHECK(!*allocation_failed && unsorted_run_->run_size() == inmem_run_max_pages_); + + // When the first minirun is full, and there are more tuples to come, we first + // need to ensure that these in-memory miniruns can be merged later, by trying + // to reserve pages for the output run of the in-memory merger. Only if this + // initialization was successful, can we move on to create the 2nd inmem run. + // If it fails and/or only 1 inmem_run could fit into memory, start spilling. + // No need to initialize 'merge_output_run_' if spilling is disabled, because the + // output will be read directly from the merger in GetNext(). + if (enable_spilling_ && sorted_inmem_runs_.empty()) { + DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge."; + merge_output_run_ = run_pool_.Add( + new Run(this, output_row_desc_->tuple_descriptors()[0], false)); + RETURN_IF_ERROR(merge_output_run_->TryInit(allocation_failed)); + if (*allocation_failed) { + return Status::OK(); + } + } + RETURN_IF_ERROR(SortCurrentInputRun()); + sorted_inmem_runs_.push_back(unsorted_run_); + unsorted_run_ = + run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true)); + RETURN_IF_ERROR(unsorted_run_->TryInit(allocation_failed)); + if (*allocation_failed) { + unsorted_run_->CloseAllPages(); + } + return Status::OK(); +} + +Status Sorter::MergeAndSpill() { + // The last minirun might have been created just before we ran out of memory. + // In this case it should not be sorted and merged. + if (unsorted_run_->run_size() == 0){ + unsorted_run_ = nullptr; + } else { + RETURN_IF_ERROR(SortCurrentInputRun()); + sorted_inmem_runs_.push_back(unsorted_run_); + } + + // If only 1 run was created, do not merge. + if (sorted_inmem_runs_.size() == 1) { + sorted_runs_.push_back(sorted_inmem_runs_.back()); + sorted_inmem_runs_.clear(); + DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0); + RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages()); + // If 'merge_output_run_' was initialized but no merge was executed, + // set it back to nullptr. + if (merge_output_run_ != nullptr){ + merge_output_run_->CloseAllPages(); + merge_output_run_ = nullptr; + } + } else { + DCHECK(merge_output_run_ != nullptr) << "Should have reserved memory for the merger."; + RETURN_IF_ERROR(MergeInMemoryRuns()); + DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge."; + } + + DCHECK(sorted_inmem_runs_.empty()); + return Status::OK(); +} + bool Sorter::MustSortAndSpill(const int rows_added, const int batch_num_rows) { if (rows_added < batch_num_rows) { return true; @@ -1086,7 +1203,28 @@ void Sorter::TryLowerMemUpToSortRunBytesLimit() { Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* num_processed) { DCHECK(batch != nullptr); - RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed)); + bool allocation_failed = false; + + RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed, + &allocation_failed)); + + if (inmem_run_max_pages_ > 0) { + start_index += *num_processed; + // We try to add the entire input batch. If it does not fit into 1 minirun, + // initialize a new one. + while (!allocation_failed && start_index < batch->num_rows()) { + RETURN_IF_ERROR(InitializeNewMinirun(&allocation_failed)); + if (allocation_failed) { + break; + } + int processed = 0; + RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, &processed, + &allocation_failed)); + start_index += processed; + *num_processed += processed; + } + } + // Clear any temporary allocations made while materializing the sort tuples. expr_results_pool_.Clear(); return Status::OK(); @@ -1096,6 +1234,45 @@ Status Sorter::InputDone() { // Sort the tuples in the last run. RETURN_IF_ERROR(SortCurrentInputRun()); + if (inmem_run_max_pages_ > 0) { + sorted_inmem_runs_.push_back(unsorted_run_); + unsorted_run_ = nullptr; + if (!HasSpilledRuns()) { + if (sorted_inmem_runs_.size() == 1) { + DCHECK(sorted_inmem_runs_.back()->is_pinned()); + DCHECK(merge_output_run_ == nullptr); + RETURN_IF_ERROR(sorted_inmem_runs_.back()->PrepareRead()); + return Status::OK(); + } + if (enable_spilling_) { + DCHECK(merge_output_run_ != nullptr); + merge_output_run_->CloseAllPages(); + merge_output_run_ = nullptr; + } + // 'merge_output_run_' is not initialized for partial sort, because the output + // will be read directly from the merger. + DCHECK(enable_spilling_ || merge_output_run_ == nullptr); + return CreateMerger(sorted_inmem_runs_.size(), false); + } + DCHECK(enable_spilling_); + + if (sorted_inmem_runs_.size() == 1) { + sorted_runs_.push_back(sorted_inmem_runs_.back()); + sorted_inmem_runs_.clear(); + DCHECK_GT(sorted_runs_.back()->run_size(), 0); + RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages()); + } else { + RETURN_IF_ERROR(MergeInMemoryRuns()); + } + DCHECK(sorted_inmem_runs_.empty()); + // Merge intermediate runs until we have a final merge set-up. + return MergeIntermediateRuns(); + } else { + sorted_runs_.push_back(unsorted_run_); + } + + unsorted_run_ = nullptr; + if (sorted_runs_.size() == 1) { // The entire input fit in one run. Read sorted rows in GetNext() directly from the // in-memory sorted run. @@ -1114,10 +1291,19 @@ Status Sorter::InputDone() { return MergeIntermediateRuns(); } + Status Sorter::GetNext(RowBatch* output_batch, bool* eos) { - if (sorted_runs_.size() == 1) { + if (sorted_inmem_runs_.size() == 1 && !HasSpilledRuns()) { + DCHECK(sorted_inmem_runs_.back()->is_pinned()); + return sorted_inmem_runs_.back()->GetNext(output_batch, eos); + } else if (inmem_run_max_pages_ == 0 && sorted_runs_.size() == 1) { DCHECK(sorted_runs_.back()->is_pinned()); return sorted_runs_.back()->GetNext(output_batch, eos); + } else if (inmem_run_max_pages_ > 0 && !HasSpilledRuns()) { + RETURN_IF_ERROR(merger_->GetNext(output_batch, eos)); + // Clear any temporary allocations made by the merger. + expr_results_pool_.Clear(); + return Status::OK(); } else { RETURN_IF_ERROR(merger_->GetNext(output_batch, eos)); // Clear any temporary allocations made by the merger. @@ -1144,6 +1330,7 @@ void Sorter::Close(RuntimeState* state) { } void Sorter::CleanupAllRuns() { + Run::CleanupRuns(&sorted_inmem_runs_); Run::CleanupRuns(&sorted_runs_); Run::CleanupRuns(&merging_runs_); if (unsorted_run_ != nullptr) unsorted_run_->CloseAllPages(); @@ -1160,10 +1347,8 @@ Status Sorter::SortCurrentInputRun() { SCOPED_TIMER(in_mem_sort_timer_); RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_)); } - sorted_runs_.push_back(unsorted_run_); sorted_data_size_->Add(unsorted_run_->TotalBytes()); run_sizes_->UpdateCounter(unsorted_run_->num_tuples()); - unsorted_run_ = nullptr; RETURN_IF_CANCELLED(state_); return Status::OK(); @@ -1236,8 +1421,31 @@ int Sorter::GetNumOfRunsForMerge() const { return max_runs_in_next_merge; } +Status Sorter::MergeInMemoryRuns() { + DCHECK_GE(sorted_inmem_runs_.size(), 2); + DCHECK_GT(sorted_inmem_runs_.back()->run_size(), 0); + + // No need to allocate more memory before doing in-memory merges, because the + // buffers of the in-memory runs are already open and they fit into memory. + // The merge output run is already initialized, too. + + DCHECK(merge_output_run_ != nullptr) << "Should have initialized output run for merge."; + RETURN_IF_ERROR(CreateMerger(sorted_inmem_runs_.size(), false)); + { + SCOPED_TIMER(in_mem_merge_timer_); + RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_)); + } + spilled_runs_counter_->Add(1); + sorted_runs_.push_back(merge_output_run_); + DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0); + merge_output_run_ = nullptr; + DCHECK(sorted_inmem_runs_.empty()); + return Status::OK(); +} + Status Sorter::MergeIntermediateRuns() { DCHECK_GE(sorted_runs_.size(), 2); + DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0); // Attempt to allocate more memory before doing intermediate merges. This may // be possible if other operators have relinquished memory after the sort has built @@ -1248,7 +1456,7 @@ Status Sorter::MergeIntermediateRuns() { int num_of_runs_to_merge = GetNumOfRunsForMerge(); DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge."; - RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge)); + RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge, true)); // If CreateMerger() consumed all the sorted runs, we have set up the final merge. if (sorted_runs_.empty()) return Status::OK(); @@ -1263,9 +1471,16 @@ Status Sorter::MergeIntermediateRuns() { return Status::OK(); } -Status Sorter::CreateMerger(int num_runs) { +Status Sorter::CreateMerger(int num_runs, bool external) { + std::deque* runs_to_merge; + + if (external) { + DCHECK_GE(sorted_runs_.size(), 2); + runs_to_merge = &sorted_runs_; + } else { + runs_to_merge = &sorted_inmem_runs_; + } DCHECK_GE(num_runs, 2); - DCHECK_GE(sorted_runs_.size(), 2); // Clean up the runs from the previous merge. Run::CleanupRuns(&merging_runs_); @@ -1273,24 +1488,25 @@ Status Sorter::CreateMerger(int num_runs) { // from the runs being merged. This is unnecessary overhead that is not required if we // correctly transfer resources. merger_.reset( - new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, true, + new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, external, codegend_heapify_helper_fn_)); vector> merge_runs; merge_runs.reserve(num_runs); for (int i = 0; i < num_runs; ++i) { - Run* run = sorted_runs_.front(); + Run* run = runs_to_merge->front(); RETURN_IF_ERROR(run->PrepareRead()); // Run::GetNextBatch() is used by the merger to retrieve a batch of rows to merge // from this run. merge_runs.emplace_back(bind(mem_fn(&Run::GetNextBatch), run, _1)); - sorted_runs_.pop_front(); + runs_to_merge->pop_front(); merging_runs_.push_back(run); } RETURN_IF_ERROR(merger_->Prepare(merge_runs)); - - num_merges_counter_->Add(1); + if (external) { + num_merges_counter_->Add(1); + } return Status::OK(); } diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index c83bf81cc..39617e8e3 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -105,7 +105,7 @@ class Sorter { /// 'enable_spilling' should be set to false to reduce the number of requested buffers /// if the caller will use AddBatchNoSpill(). /// 'codegend_sort_helper_fn' is a reference to the codegen version of - /// the Sorter::TupleSorter::SortHelp() method. + /// the Sorter::TupleSorter::SortHelper() method. /// 'estimated_input_size' is the total rows in bytes that are estimated to get added /// into this sorter. This is used to decide if sorter needs to proactively spill for /// the first run. -1 value means estimate is unavailable. @@ -166,20 +166,39 @@ class Sorter { /// Return true if the sorter has any spilled runs. bool HasSpilledRuns() const; + /// The logic in AddBatchNoSpill() that handles the different cases when + /// AddBatchInternal() returns without processing the entire batch. + /// In this case we need to initialize a new minirun for the incoming rows. + /// Tries initializing a new minirun. If the initialization was successful, + /// allocation_failed is set to false, otherwise (e.g. a memory limit is hit) true. + Status InitializeNewMinirun(bool* allocation_failed) WARN_UNUSED_RESULT; + private: class Page; class Run; - /// Minimum value for sot_run_bytes_limit query option. + /// Minimum value for sort_run_bytes_limit query option. static const int64_t MIN_SORT_RUN_BYTES_LIMIT = 32 << 20; // 32 MB + /// Merges multiple smaller runs in sorted_inmem_runs_ into a single larger merged + /// run that can be spilled page by page during the process, until all pages from + /// sorted_inmem_runs are consumed and freed. + /// Always performs one-level merge and spills the output run to disc. Therefore there + /// is no need to deep-copy the input, since the pages containing var-len data + /// remain in the memory until the end of the in-memory merge. + /// TODO: delete pages that are consumed during merge asap. + Status MergeInMemoryRuns() WARN_UNUSED_RESULT; + /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign it to /// 'merger_'. 'num_runs' indicates how many runs should be covered by the current - /// merging attempt. Returns error if memory allocation fails during in - /// Run::PrepareRead(). The runs to be merged are removed from 'sorted_runs_'. The + /// merging attempt. Returns error if memory allocation fails in Run::PrepareRead(). + /// The runs to be merged are removed from 'sorted_runs_'. + /// If 'external' is set to true, it performs an external merge, and the /// Sorter sets the 'deep_copy_input' flag to true for the merger, since the pages /// containing input run data will be deleted as input runs are read. - Status CreateMerger(int num_runs) WARN_UNUSED_RESULT; + /// If 'external' is false, it creates an in-memory merger for the in-memory miniruns. + /// In this case, 'deep_copy_input' is set to false. + Status CreateMerger(int num_runs, bool external) WARN_UNUSED_RESULT; /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger /// merged run until there are few enough runs to be merged with a single merger. @@ -193,6 +212,10 @@ class Sorter { /// and adding them to 'merged_run'. Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT; + /// Handles cases in AddBatch where a memory limit is reached and spilling must start. + /// Used only in case of multiple in-memory runs set by query option MAX_SORT_RUN_SIZE. + inline Status MergeAndSpill() WARN_UNUSED_RESULT; + /// Called once there no more rows to be added to 'unsorted_run_'. Sorts /// 'unsorted_run_' and appends it to the list of sorted runs. Status SortCurrentInputRun() WARN_UNUSED_RESULT; @@ -266,8 +289,8 @@ class Sorter { const CodegenFnPtr& codegend_heapify_helper_fn_; /// A default codegened function pointer storing nullptr, which is used when the - /// merger is not needed. Used as a default value in constructor, when the CodegenFnPtr - /// is not provided. + /// merger is not needed. Used as a default value in the constructor, when the + /// CodegenFnPtr is not provided. static const CodegenFnPtr default_heapify_helper_fn_; /// Client used to allocate pages from the buffer pool. Not owned. @@ -302,9 +325,13 @@ class Sorter { /// When it is added to sorted_runs_, it is set to NULL. Run* unsorted_run_; - /// List of sorted runs that have been produced but not merged. unsorted_run_ is added - /// to this list after an in-memory sort. Sorted runs produced by intermediate merges - /// are also added to this list during the merge. Runs are added to the object pool. + /// List of quicksorted miniruns before merging in memory. + std::deque sorted_inmem_runs_; + + /// List of sorted runs that have been produced but not merged. 'unsorted_run_' is + /// added to this list after an in-memory sort. Sorted runs produced by intermediate + /// merges are also added to this list during the merge. Runs are added to the + /// object pool. std::deque sorted_runs_; /// Merger object (intermediate or final) currently used to produce sorted runs. @@ -345,6 +372,9 @@ class Sorter { /// Time spent sorting initial runs in memory. RuntimeProfile::Counter* in_mem_sort_timer_; + /// Time spent merging initial miniruns in memory. + RuntimeProfile::Counter* in_mem_merge_timer_; + /// Total size of the initial runs in bytes. RuntimeProfile::Counter* sorted_data_size_; @@ -353,6 +383,12 @@ class Sorter { /// Flag to enforce sort_run_bytes_limit. bool enforce_sort_run_bytes_limit_ = false; + + /// Maximum number of fixed-length + variable-length pages in an in-memory run set by + /// Query Option MAX_SORT_RUN_SIZE. + /// The default value is 0 which means that only 1 in-memory run will be created, and + /// its size will be determined by other limits eg. memory or sort_run_bytes_limit. + int inmem_run_max_pages_ = 0; }; } // namespace impala diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index da959234d..af0a74463 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1099,6 +1099,15 @@ Status impala::SetQueryOption(const string& key, const string& value, query_options->__set_max_fragment_instances_per_node(max_num); break; } + case TImpalaQueryOptions::MAX_SORT_RUN_SIZE: { + int32_t int32_t_val = 0; + RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative( + option, value, &int32_t_val)); + RETURN_IF_ERROR( + QueryOptionValidator::NotEquals(option, int32_t_val, 1)); + query_options->__set_max_sort_run_size(int32_t_val); + break; + } default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index db47664ec..f88e5fc74 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -50,7 +50,7 @@ typedef std::unordered_map // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ - TImpalaQueryOptions::MAX_FRAGMENT_INSTANCES_PER_NODE + 1); \ + TImpalaQueryOptions::MAX_SORT_RUN_SIZE + 1); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ @@ -291,7 +291,8 @@ typedef std::unordered_map QUERY_OPT_FN(join_selectivity_correlation_factor, JOIN_SELECTIVITY_CORRELATION_FACTOR, \ TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(max_fragment_instances_per_node, MAX_FRAGMENT_INSTANCES_PER_NODE, \ - TQueryOptionLevel::ADVANCED); + TQueryOptionLevel::ADVANCED); \ + QUERY_OPT_FN(max_sort_run_size, MAX_SORT_RUN_SIZE, TQueryOptionLevel::DEVELOPMENT) ; /// Enforce practical limits on some query options to avoid undesired query state. static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB diff --git a/be/src/util/tuple-row-compare.h b/be/src/util/tuple-row-compare.h index c17253cc3..70d59d2c9 100644 --- a/be/src/util/tuple-row-compare.h +++ b/be/src/util/tuple-row-compare.h @@ -59,6 +59,8 @@ class ComparatorWrapper { } }; +class TupleRowComparator; + /// TupleRowComparatorConfig contains the static state initialized from its corresponding /// thrift structure. It serves as an input for creating instances of the /// TupleRowComparator class. @@ -102,8 +104,8 @@ class TupleRowComparatorConfig { std::vector nulls_first_; /// Codegened version of TupleRowComparator::Compare(). - typedef int (*CompareFn)(ScalarExprEvaluator* const*, ScalarExprEvaluator* const*, - const TupleRow*, const TupleRow*); + typedef int (*CompareFn)(const TupleRowComparator*, ScalarExprEvaluator* const*, + ScalarExprEvaluator* const*, const TupleRow*, const TupleRow*); CodegenFnPtr codegend_compare_fn_; private: @@ -156,8 +158,19 @@ class TupleRowComparator { /// hot loops. bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const { return Compare( - ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) - < 0; + ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) < 0; + } + + bool ALWAYS_INLINE LessCodegend(const TupleRow* lhs, const TupleRow* rhs) const { + if (codegend_compare_fn_non_atomic_ != nullptr) { + return codegend_compare_fn_non_atomic_(this, + ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) < 0; + } else { + TupleRowComparatorConfig::CompareFn fn = codegend_compare_fn_.load(); + if (fn != nullptr) codegend_compare_fn_non_atomic_ = fn; + } + return Compare( + ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) < 0; } bool ALWAYS_INLINE Less(const Tuple* lhs, const Tuple* rhs) const { @@ -197,6 +210,7 @@ class TupleRowComparator { /// Reference to the codegened function pointer owned by the TupleRowComparatorConfig /// object that was used to create this instance. const CodegenFnPtr& codegend_compare_fn_; + mutable TupleRowComparatorConfig::CompareFn codegend_compare_fn_non_atomic_ = nullptr; private: /// Interpreted implementation of Compare(). diff --git a/bin/perf_tools/perf-query.sh b/bin/perf_tools/perf-query.sh index 8e44491d6..3e6ace7d1 100755 --- a/bin/perf_tools/perf-query.sh +++ b/bin/perf_tools/perf-query.sh @@ -64,7 +64,7 @@ sudo echo "test sudo" sudo perf record -F 99 -g -a & perf_pid=$! -~/Impala/bin/impala-shell.sh -q "$1" +${IMPALA_HOME}/bin/impala-shell.sh -q "$1" # Send interrupt to 'perf record'. We need to issue 'kill' in a new session/process # group via 'setsid', otherwise 'perf record' won't get the signal (because it's diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index e501d9cc2..26651a00f 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -785,6 +785,16 @@ enum TImpalaQueryOptions { // PROCESSING_COST_MIN_THREADS option has higher value. // Valid values are in [1, 128]. Default to 128. MAX_FRAGMENT_INSTANCES_PER_NODE = 156 + + // Configures the in-memory sort algorithm used in the sorter. Determines the + // maximum number of pages in an initial in-memory run (fixed + variable length). + // 0 means unlimited, which will create 1 big run with no in-memory merge phase. + // Setting any other other value can create multiple miniruns which leads to an + // in-memory merge phase. The minimum value in that case is 2. + // Generally, with larger workloads the recommended value is 10 or more to avoid + // high fragmentation of variable length data. + MAX_SORT_RUN_SIZE = 157; + } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index f482d21b5..2e0157a96 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -614,6 +614,7 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 150: optional bool disable_codegen_cache = false; + 151: optional TCodeGenCacheMode codegen_cache_mode = TCodeGenCacheMode.NORMAL; // See comment in ImpalaService.thrift @@ -633,6 +634,10 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 157: optional i32 max_fragment_instances_per_node = MAX_FRAGMENT_INSTANCES_PER_NODE + + // Configures the in-memory sort algorithm used in the sorter. + // See comment in ImpalaService.thrift + 158: optional i32 max_sort_run_size = 0; } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py index e536b0134..82e43a78f 100644 --- a/tests/query_test/test_sort.py +++ b/tests/query_test/test_sort.py @@ -21,6 +21,10 @@ from copy import copy, deepcopy from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.skip import SkipIfNotHdfsMinicluster +from tests.common.test_vector import ImpalaTestDimension + +# Run sizes (number of pages per run) in sorter +MAX_SORT_RUN_SIZE = [0, 2, 20] def split_result_rows(result): @@ -55,6 +59,8 @@ class TestQueryFullSort(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestQueryFullSort, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size', + *MAX_SORT_RUN_SIZE)) if cls.exploration_strategy() == 'core': cls.ImpalaTestMatrix.add_constraint(lambda v:\ @@ -229,10 +235,21 @@ class TestQueryFullSort(ImpalaTestSuite): class TestRandomSort(ImpalaTestSuite): @classmethod def get_workload(self): - return 'functional' + return 'functional-query' - def test_order_by_random(self): + @classmethod + def add_test_dimensions(cls): + super(TestRandomSort, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size', + *MAX_SORT_RUN_SIZE)) + + if cls.exploration_strategy() == 'core': + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet') + + def test_order_by_random(self, vector): """Tests that 'order by random()' works as expected.""" + exec_option = copy(vector.get_value('exec_option')) # "order by random()" with different seeds should produce different orderings. seed_query = "select * from functional.alltypestiny order by random(%s)" results_seed0 = self.execute_query(seed_query % "0") @@ -242,8 +259,8 @@ class TestRandomSort(ImpalaTestSuite): # Include "random()" in the select list to check that it's sorted correctly. results = transpose_results(self.execute_query( - "select random() as r from functional.alltypessmall order by r").data, - lambda x: float(x)) + "select random() as r from functional.alltypessmall order by r", + exec_option).data, lambda x: float(x)) assert(results[0] == sorted(results[0])) # Like above, but with a limit. @@ -254,22 +271,40 @@ class TestRandomSort(ImpalaTestSuite): # "order by random()" inside an inline view. query = "select r from (select random() r from functional.alltypessmall) v order by r" - results = transpose_results(self.execute_query(query).data, lambda x: float(x)) + results = transpose_results(self.execute_query(query, exec_option).data, + lambda x: float(x)) assert (results == sorted(results)) - def test_analytic_order_by_random(self): + def test_analytic_order_by_random(self, vector): """Tests that a window function over 'order by random()' works as expected.""" + exec_option = copy(vector.get_value('exec_option')) # Since we use the same random seed, the results should be returned in order. query = """select last_value(rand(2)) over (order by rand(2)) from functional.alltypestiny""" - results = transpose_results(self.execute_query(query).data, lambda x: float(x)) + results = transpose_results(self.execute_query(query, exec_option).data, + lambda x: float(x)) assert (results == sorted(results)) + class TestPartialSort(ImpalaTestSuite): """Test class to do functional validation of partial sorts.""" - def test_partial_sort_min_reservation(self, unique_database): + @classmethod + def get_workload(self): + return 'tpch' + + @classmethod + def add_test_dimensions(cls): + super(TestPartialSort, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size', + *MAX_SORT_RUN_SIZE)) + + if cls.exploration_strategy() == 'core': + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format == 'parquet') + + def test_partial_sort_min_reservation(self, vector, unique_database): """Test that the partial sort node can operate if it only gets its minimum memory reservation.""" table_name = "%s.kudu_test" % unique_database @@ -277,10 +312,36 @@ class TestPartialSort(ImpalaTestSuite): "debug_action", "-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0") self.execute_query("""create table %s (col0 string primary key) partition by hash(col0) partitions 8 stored as kudu""" % table_name) + exec_option = copy(vector.get_value('exec_option')) result = self.execute_query( - "insert into %s select string_col from functional.alltypessmall" % table_name) + "insert into %s select string_col from functional.alltypessmall" % table_name, + exec_option) assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile + def test_partial_sort_kudu_insert(self, vector, unique_database): + table_name = "%s.kudu_partial_sort_test" % unique_database + self.execute_query("""create table %s (l_linenumber INT, l_orderkey BIGINT, + l_partkey BIGINT, l_shipdate STRING, l_quantity DECIMAL(12,2), + l_comment STRING, PRIMARY KEY(l_linenumber, l_orderkey) ) + PARTITION BY RANGE (l_linenumber) + ( + PARTITION VALUE = 1, + PARTITION VALUE = 2, + PARTITION VALUE = 3, + PARTITION VALUE = 4, + PARTITION VALUE = 5, + PARTITION VALUE = 6, + PARTITION VALUE = 7 + ) + STORED AS KUDU""" % table_name) + exec_option = copy(vector.get_value('exec_option')) + result = self.execute_query( + """insert into %s SELECT l_linenumber, l_orderkey, l_partkey, l_shipdate, + l_quantity, l_comment FROM tpch.lineitem limit 300000""" % table_name, + exec_option) + assert "NumModifiedRows: 300000" in result.runtime_profile, result.runtime_profile + assert "NumRowErrors: 0" in result.runtime_profile, result.runtime_profile + class TestArraySort(ImpalaTestSuite): """Tests where there are arrays in the sorting tuple.""" @@ -292,7 +353,8 @@ class TestArraySort(ImpalaTestSuite): @classmethod def add_test_dimensions(cls): super(TestArraySort, cls).add_test_dimensions() - + cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size', + *MAX_SORT_RUN_SIZE)) # The table we use is a parquet table. cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('table_format').file_format == 'parquet')