IMPALA-4530: Implement in-memory merge of quicksorted runs

This change aims to decrease back-pressure in the sorter. It offers an
alternative for the in-memory run formation strategy and sorting
algorithm by introducing a new in-memory merge level between the
in-memory quicksort and the external merge phase.
Instead of forming one big run, it produces many smaller in-memory runs
(called miniruns), sorts those with quicksort, then merges them
in memory, before spilling or serving GetNext().
The external merge phase remains the same.
Works with MAX_SORT_RUN_SIZE development query option that determines
the maximum number of pages in a 'minirun'. The default value of
MAX_SORT_RUN_SIZE is 0, which keeps the original implementation of 1
big initial in-memory run. Other options are integers of 2 and above.
The recommended value is 10 or more, to avoid high fragmentation
in case of large workloads and variable length data.

Testing:
- added MAX_SORT_RUN_SIZE as an additional test dimension to
  test_sort.py with values [0, 2, 20]
- additional partial sort test case (inserting into partitioned
  kudu table)
- manual E2E testing

Change-Id: I58c0ae112e279b93426752895ded7b1a3791865c
Reviewed-on: http://gerrit.cloudera.org:8080/18393
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Csaba Ringhofer <csringhofer@cloudera.com>
Tested-by: Csaba Ringhofer <csringhofer@cloudera.com>
This commit is contained in:
noemi
2022-04-06 16:36:27 +02:00
committed by Csaba Ringhofer
parent 58590376ed
commit 80c1d2dbaa
12 changed files with 452 additions and 62 deletions

View File

@@ -66,6 +66,7 @@ PartialSortNode::PartialSortNode(
input_eos_(false), input_eos_(false),
sorter_eos_(true) { sorter_eos_(true) {
runtime_profile()->AddInfoString("SortType", "Partial"); runtime_profile()->AddInfoString("SortType", "Partial");
child_get_next_timer_ = ADD_SUMMARY_STATS_TIMER(runtime_profile(), "ChildGetNextTime");
} }
PartialSortNode::~PartialSortNode() { PartialSortNode::~PartialSortNode() {
@@ -143,7 +144,12 @@ Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
if (input_batch_index_ == input_batch_->num_rows()) { if (input_batch_index_ == input_batch_->num_rows()) {
input_batch_->Reset(); input_batch_->Reset();
input_batch_index_ = 0; input_batch_index_ = 0;
RETURN_IF_ERROR(child(0)->GetNext(state, input_batch_.get(), &input_eos_)); MonotonicStopWatch timer;
timer.Start();
Status status = child(0)->GetNext(state, input_batch_.get(), &input_eos_);
timer.Stop();
RETURN_IF_ERROR(status);
child_get_next_timer_->UpdateCounter(timer.ElapsedTime());
} }
int num_processed; int num_processed;

View File

@@ -86,6 +86,9 @@ class PartialSortNode : public ExecNode {
const TupleRowComparatorConfig& tuple_row_comparator_config_; const TupleRowComparatorConfig& tuple_row_comparator_config_;
/// Min, max, and avg time spent in calling GetNext on child
RuntimeProfile::SummaryStatsCounter* child_get_next_timer_;
///////////////////////////////////////// /////////////////////////////////////////
/// BEGIN: Members that must be Reset() /// BEGIN: Members that must be Reset()

View File

@@ -114,13 +114,22 @@ class Sorter::Page {
/// ///
/// Runs are either "initial runs" constructed from the sorter's input by evaluating /// Runs are either "initial runs" constructed from the sorter's input by evaluating
/// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed /// the expressions in 'sort_tuple_exprs_' or "intermediate runs" constructed
/// by merging already-sorted runs. Initial runs are sorted in-place in memory. Once /// by merging already-sorted runs. Initial runs are sorted in-place in memory.
/// sorted, runs can be spilled to disk to free up memory. Sorted runs are merged by /// Once sorted, runs can be spilled to disk to free up memory. Sorted runs are merged by
/// SortedRunMerger, either to produce the final sorted output or to produce another /// SortedRunMerger, either to produce the final sorted output or to produce another
/// sorted run. /// sorted run.
/// By default, the size of initial runs is determined by the available memory: the
/// sorter tries to add batches to the run until some (memory) limit is reached.
/// Some query options can also limit the size of an initial (or in-memory) run.
/// SORT_RUN_BYTES_LIMIT triggers spilling after the size of data in the run exceeds the
/// given threshold (usually expressed in MB or GB).
/// MAX_SORT_RUN_SIZE allows constructing runs up to a certain size by limiting the
/// number of pages in the initial runs. These smaller in-memory runs are also referred
/// to as 'miniruns'. Miniruns are not spilled immediately, but sorted in-place first,
/// and collected to be merged in memory before spilling the produced output run to disk.
/// ///
/// The expected calling sequence of functions is as follows: /// The expected calling sequence of functions is as follows:
/// * Init() to initialize the run and allocate initial pages. /// * Init() or TryInit() to initialize the run and allocate initial pages.
/// * Add*Batch() to add batches of tuples to the run. /// * Add*Batch() to add batches of tuples to the run.
/// * FinalizeInput() to signal that no more batches will be added. /// * FinalizeInput() to signal that no more batches will be added.
/// * If the run is unsorted, it must be sorted. After that set_sorted() must be called. /// * If the run is unsorted, it must be sorted. After that set_sorted() must be called.
@@ -141,13 +150,23 @@ class Sorter::Run {
/// var-len data into var_len_copy_page_. /// var-len data into var_len_copy_page_.
Status Init(); Status Init();
/// Similar to Init(), except for the following differences:
/// It is only used to initialize miniruns (query option MAX_SORT_RUN_SIZE > 0 cases).
/// The first in-memory run is always initialized by calling Init(), because that must
/// succeed. The following ones are initialized by TryInit().
/// TryInit() allocates one fixed-len page and one var-len page if 'has_var_len_slots_'
/// is true. There is no need for var_len_copy_page here. Returns false if
/// initialization was successful, returns true, if reservation was not enough.
Status TryInit(bool* allocation_failed);
/// Add the rows from 'batch' starting at 'start_index' to the current run. Returns /// Add the rows from 'batch' starting at 'start_index' to the current run. Returns
/// the number of rows actually added in 'num_processed'. If the run is full (no more /// the number of rows actually added in 'num_processed'. If the run is full (no more
/// pages can be allocated), 'num_processed' may be less than the number of remaining /// pages can be allocated), 'num_processed' may be less than the number of remaining
/// rows in the batch. AddInputBatch() materializes the input rows using the /// rows in the batch. AddInputBatch() materializes the input rows using the
/// expressions in sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just /// expressions in sorter_->sort_tuple_expr_evals_, while AddIntermediateBatch() just
/// copies rows. /// copies rows.
Status AddInputBatch(RowBatch* batch, int start_index, int* num_processed); Status AddInputBatch(
RowBatch* batch, int start_index, int* num_processed, bool* allocation_failed);
Status AddIntermediateBatch(RowBatch* batch, int start_index, int* num_processed); Status AddIntermediateBatch(RowBatch* batch, int start_index, int* num_processed);
@@ -199,6 +218,9 @@ class Sorter::Run {
bool is_finalized() const { return is_finalized_; } bool is_finalized() const { return is_finalized_; }
bool is_sorted() const { return is_sorted_; } bool is_sorted() const { return is_sorted_; }
void set_sorted() { is_sorted_ = true; } void set_sorted() { is_sorted_ = true; }
int max_num_of_pages() const { return max_num_of_pages_; }
int fixed_len_size() { return fixed_len_pages_.size(); }
int run_size() { return fixed_len_pages_.size() + var_len_pages_.size(); }
int64_t num_tuples() const { return num_tuples_; } int64_t num_tuples() const { return num_tuples_; }
/// Returns true if we have var-len pages in the run. /// Returns true if we have var-len pages in the run.
bool HasVarLenPages() const { bool HasVarLenPages() const {
@@ -215,8 +237,8 @@ class Sorter::Run {
/// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance and must /// INITIAL_RUN and HAS_VAR_LEN_SLOTS are template arguments for performance and must
/// match 'initial_run_' and 'has_var_len_slots_'. /// match 'initial_run_' and 'has_var_len_slots_'.
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN> template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
Status AddBatchInternal( Status AddBatchInternal(RowBatch* batch, int start_index, int* num_processed,
RowBatch* batch, int start_index, int* num_processed); bool* allocation_failed);
/// Finalize the list of pages: delete empty final pages and unpin the previous page /// Finalize the list of pages: delete empty final pages and unpin the previous page
/// if the run is unpinned. /// if the run is unpinned.
@@ -352,6 +374,12 @@ class Sorter::Run {
/// Used to implement GetNextBatch() interface required for the merger. /// Used to implement GetNextBatch() interface required for the merger.
boost::scoped_ptr<RowBatch> buffered_batch_; boost::scoped_ptr<RowBatch> buffered_batch_;
/// Max number of fixed-len + var-len pages in an in-memory minirun. It defines the
/// length of a minirun.
/// The default value is 0 which means that only 1 in-memory run will be created, and
/// its size will be determined by other limits eg. memory or sort_run_bytes_limit.
int max_num_of_pages_;
/// Members used when a run is read in GetNext(). /// Members used when a run is read in GetNext().
/// The index into 'fixed_' and 'var_len_pages_' of the pages being read in GetNext(). /// The index into 'fixed_' and 'var_len_pages_' of the pages being read in GetNext().
int fixed_len_pages_index_; int fixed_len_pages_index_;

View File

@@ -121,11 +121,12 @@ Sorter::Run::Run(Sorter* parent, TupleDescriptor* sort_tuple_desc, bool initial_
is_pinned_(initial_run), is_pinned_(initial_run),
is_finalized_(false), is_finalized_(false),
is_sorted_(!initial_run), is_sorted_(!initial_run),
num_tuples_(0) {} num_tuples_(0),
max_num_of_pages_(initial_run ? parent->inmem_run_max_pages_ : 0) {}
Status Sorter::Run::Init() { Status Sorter::Run::Init() {
int num_to_create = 1 + has_var_len_slots_ int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ && initial_run_ &&
+ (has_var_len_slots_ && initial_run_ && sorter_->enable_spilling_); (sorter_->enable_spilling_ && max_num_of_pages_ == 0));
int64_t required_mem = num_to_create * sorter_->page_len_; int64_t required_mem = num_to_create * sorter_->page_len_;
if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) { if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
return Status(Substitute( return Status(Substitute(
@@ -152,9 +153,32 @@ Status Sorter::Run::Init() {
return Status::OK(); return Status::OK();
} }
Status Sorter::Run::TryInit(bool* allocation_failed) {
*allocation_failed = true;
DCHECK_GT(sorter_->inmem_run_max_pages_, 0);
// No need for additional copy page because var-len data is not reordered.
// The in-memory merger can copy var-len data directly from the in-memory runs,
// which are kept until the merge is finished
int num_to_create = 1 + has_var_len_slots_;
int64_t required_mem = num_to_create * sorter_->page_len_;
if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
return Status::OK();
}
RETURN_IF_ERROR(AddPage(&fixed_len_pages_));
if (has_var_len_slots_) {
RETURN_IF_ERROR(AddPage(&var_len_pages_));
}
if (initial_run_) {
sorter_->initial_runs_counter_->Add(1);
}
*allocation_failed = false;
return Status::OK();
}
template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN> template <bool HAS_VAR_LEN_SLOTS, bool INITIAL_RUN>
Status Sorter::Run::AddBatchInternal( Status Sorter::Run::AddBatchInternal(
RowBatch* batch, int start_index, int* num_processed) { RowBatch* batch, int start_index, int* num_processed, bool* allocation_failed) {
DCHECK(!is_finalized_); DCHECK(!is_finalized_);
DCHECK(!fixed_len_pages_.empty()); DCHECK(!fixed_len_pages_.empty());
DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_); DCHECK_EQ(HAS_VAR_LEN_SLOTS, has_var_len_slots_);
@@ -226,6 +250,7 @@ Status Sorter::Run::AddBatchInternal(
// There was not enough space in the last var-len page for this tuple, and // There was not enough space in the last var-len page for this tuple, and
// the run could not be extended. Return the fixed-len allocation and exit. // the run could not be extended. Return the fixed-len allocation and exit.
cur_fixed_len_page->FreeBytes(sort_tuple_size_); cur_fixed_len_page->FreeBytes(sort_tuple_size_);
*allocation_failed = true;
return Status::OK(); return Status::OK();
} }
} }
@@ -246,13 +271,21 @@ Status Sorter::Run::AddBatchInternal(
// If there are still rows left to process, get a new page for the fixed-length // If there are still rows left to process, get a new page for the fixed-length
// tuples. If the run is already too long, return. // tuples. If the run is already too long, return.
if (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() >= max_num_of_pages_){
*allocation_failed = false;
return Status::OK();
}
if (cur_input_index < batch->num_rows()) { if (cur_input_index < batch->num_rows()) {
bool added; bool added;
RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added)); RETURN_IF_ERROR(TryAddPage(add_mode, &fixed_len_pages_, &added));
if (!added) return Status::OK(); if (!added) {
*allocation_failed = true;
return Status::OK();
}
cur_fixed_len_page = &fixed_len_pages_.back(); cur_fixed_len_page = &fixed_len_pages_.back();
} }
} }
*allocation_failed = false;
return Status::OK(); return Status::OK();
} }
@@ -773,22 +806,28 @@ int64_t Sorter::Run::TotalBytes() const {
return total_bytes; return total_bytes;
} }
Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* num_processed) { Status Sorter::Run::AddInputBatch(RowBatch* batch, int start_index, int* num_processed,
bool* allocation_failed) {
DCHECK(initial_run_); DCHECK(initial_run_);
if (has_var_len_slots_) { if (has_var_len_slots_) {
return AddBatchInternal<true, true>(batch, start_index, num_processed); return AddBatchInternal<true, true>(
batch, start_index, num_processed, allocation_failed);
} else { } else {
return AddBatchInternal<false, true>(batch, start_index, num_processed); return AddBatchInternal<false, true>(
batch, start_index, num_processed, allocation_failed);
} }
} }
Status Sorter::Run::AddIntermediateBatch( Status Sorter::Run::AddIntermediateBatch(
RowBatch* batch, int start_index, int* num_processed) { RowBatch* batch, int start_index, int* num_processed) {
DCHECK(!initial_run_); DCHECK(!initial_run_);
bool allocation_failed = false;
if (has_var_len_slots_) { if (has_var_len_slots_) {
return AddBatchInternal<true, false>(batch, start_index, num_processed); return AddBatchInternal<true, false>(
batch, start_index, num_processed, &allocation_failed);
} else { } else {
return AddBatchInternal<false, false>(batch, start_index, num_processed); return AddBatchInternal<false, false>(
batch, start_index, num_processed, &allocation_failed);
} }
} }
@@ -909,8 +948,10 @@ Sorter::Sorter(const TupleRowComparatorConfig& tuple_row_comparator_config,
initial_runs_counter_(nullptr), initial_runs_counter_(nullptr),
num_merges_counter_(nullptr), num_merges_counter_(nullptr),
in_mem_sort_timer_(nullptr), in_mem_sort_timer_(nullptr),
in_mem_merge_timer_(nullptr),
sorted_data_size_(nullptr), sorted_data_size_(nullptr),
run_sizes_(nullptr) { run_sizes_(nullptr),
inmem_run_max_pages_(state->query_options().max_sort_run_size) {
switch (tuple_row_comparator_config.sorting_order_) { switch (tuple_row_comparator_config.sorting_order_) {
case TSortingOrder::LEXICAL: case TSortingOrder::LEXICAL:
compare_less_than_.reset( compare_less_than_.reset(
@@ -922,13 +963,13 @@ Sorter::Sorter(const TupleRowComparatorConfig& tuple_row_comparator_config,
default: default:
DCHECK(false); DCHECK(false);
} }
if (estimated_input_size > 0) ComputeSpillEstimate(estimated_input_size); if (estimated_input_size > 0) ComputeSpillEstimate(estimated_input_size);
} }
Sorter::~Sorter() { Sorter::~Sorter() {
DCHECK(sorted_runs_.empty()); DCHECK(sorted_runs_.empty());
DCHECK(merging_runs_.empty()); DCHECK(merging_runs_.empty());
DCHECK(sorted_inmem_runs_.empty());
DCHECK(unsorted_run_ == nullptr); DCHECK(unsorted_run_ == nullptr);
DCHECK(merge_output_run_ == nullptr); DCHECK(merge_output_run_ == nullptr);
} }
@@ -977,6 +1018,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
initial_runs_counter_ = ADD_COUNTER(profile_, "RunsCreated", TUnit::UNIT); initial_runs_counter_ = ADD_COUNTER(profile_, "RunsCreated", TUnit::UNIT);
} }
in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime"); in_mem_sort_timer_ = ADD_TIMER(profile_, "InMemorySortTime");
in_mem_merge_timer_ = ADD_TIMER(profile_, "InMemoryMergeTime");
sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES); sorted_data_size_ = ADD_COUNTER(profile_, "SortDataSize", TUnit::BYTES);
run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", TUnit::UNIT); run_sizes_ = ADD_SUMMARY_STATS_COUNTER(profile_, "NumRowsPerRun", TUnit::UNIT);
@@ -1016,12 +1058,23 @@ Status Sorter::AddBatch(RowBatch* batch) {
RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed)); RETURN_IF_ERROR(AddBatchNoSpill(batch, cur_batch_index, &num_processed));
cur_batch_index += num_processed; cur_batch_index += num_processed;
if (MustSortAndSpill(cur_batch_index, batch->num_rows())) { if (MustSortAndSpill(cur_batch_index, batch->num_rows())) {
// The current run is full. Sort it, spill it and begin the next one.
int64_t unsorted_run_bytes = unsorted_run_->TotalBytes();
RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_)); RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
RETURN_IF_ERROR(SortCurrentInputRun()); int64_t unsorted_run_bytes = unsorted_run_->TotalBytes();
RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
if (inmem_run_max_pages_ == 0) {
// The current run is full. Sort it and spill it.
RETURN_IF_ERROR(SortCurrentInputRun());
sorted_runs_.push_back(unsorted_run_);
unsorted_run_ = nullptr;
RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
} else {
// The memory is full with miniruns. Sort, merge and spill them.
RETURN_IF_ERROR(MergeAndSpill());
}
// After we freed memory by spilling, initialize the next run.
unsorted_run_ = unsorted_run_ =
run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true)); run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true));
RETURN_IF_ERROR(unsorted_run_->Init()); RETURN_IF_ERROR(unsorted_run_->Init());
@@ -1058,6 +1111,70 @@ int64_t Sorter::GetSortRunBytesLimit() const {
} }
} }
Status Sorter::InitializeNewMinirun(bool* allocation_failed) {
// The minirun reached its size limit (max_num_of_pages).
// We should sort the run, append to the sorted miniruns, and start a new minirun.
DCHECK(!*allocation_failed && unsorted_run_->run_size() == inmem_run_max_pages_);
// When the first minirun is full, and there are more tuples to come, we first
// need to ensure that these in-memory miniruns can be merged later, by trying
// to reserve pages for the output run of the in-memory merger. Only if this
// initialization was successful, can we move on to create the 2nd inmem run.
// If it fails and/or only 1 inmem_run could fit into memory, start spilling.
// No need to initialize 'merge_output_run_' if spilling is disabled, because the
// output will be read directly from the merger in GetNext().
if (enable_spilling_ && sorted_inmem_runs_.empty()) {
DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge.";
merge_output_run_ = run_pool_.Add(
new Run(this, output_row_desc_->tuple_descriptors()[0], false));
RETURN_IF_ERROR(merge_output_run_->TryInit(allocation_failed));
if (*allocation_failed) {
return Status::OK();
}
}
RETURN_IF_ERROR(SortCurrentInputRun());
sorted_inmem_runs_.push_back(unsorted_run_);
unsorted_run_ =
run_pool_.Add(new Run(this, output_row_desc_->tuple_descriptors()[0], true));
RETURN_IF_ERROR(unsorted_run_->TryInit(allocation_failed));
if (*allocation_failed) {
unsorted_run_->CloseAllPages();
}
return Status::OK();
}
Status Sorter::MergeAndSpill() {
// The last minirun might have been created just before we ran out of memory.
// In this case it should not be sorted and merged.
if (unsorted_run_->run_size() == 0){
unsorted_run_ = nullptr;
} else {
RETURN_IF_ERROR(SortCurrentInputRun());
sorted_inmem_runs_.push_back(unsorted_run_);
}
// If only 1 run was created, do not merge.
if (sorted_inmem_runs_.size() == 1) {
sorted_runs_.push_back(sorted_inmem_runs_.back());
sorted_inmem_runs_.clear();
DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
// If 'merge_output_run_' was initialized but no merge was executed,
// set it back to nullptr.
if (merge_output_run_ != nullptr){
merge_output_run_->CloseAllPages();
merge_output_run_ = nullptr;
}
} else {
DCHECK(merge_output_run_ != nullptr) << "Should have reserved memory for the merger.";
RETURN_IF_ERROR(MergeInMemoryRuns());
DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge.";
}
DCHECK(sorted_inmem_runs_.empty());
return Status::OK();
}
bool Sorter::MustSortAndSpill(const int rows_added, const int batch_num_rows) { bool Sorter::MustSortAndSpill(const int rows_added, const int batch_num_rows) {
if (rows_added < batch_num_rows) { if (rows_added < batch_num_rows) {
return true; return true;
@@ -1086,7 +1203,28 @@ void Sorter::TryLowerMemUpToSortRunBytesLimit() {
Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* num_processed) { Status Sorter::AddBatchNoSpill(RowBatch* batch, int start_index, int* num_processed) {
DCHECK(batch != nullptr); DCHECK(batch != nullptr);
RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed)); bool allocation_failed = false;
RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, num_processed,
&allocation_failed));
if (inmem_run_max_pages_ > 0) {
start_index += *num_processed;
// We try to add the entire input batch. If it does not fit into 1 minirun,
// initialize a new one.
while (!allocation_failed && start_index < batch->num_rows()) {
RETURN_IF_ERROR(InitializeNewMinirun(&allocation_failed));
if (allocation_failed) {
break;
}
int processed = 0;
RETURN_IF_ERROR(unsorted_run_->AddInputBatch(batch, start_index, &processed,
&allocation_failed));
start_index += processed;
*num_processed += processed;
}
}
// Clear any temporary allocations made while materializing the sort tuples. // Clear any temporary allocations made while materializing the sort tuples.
expr_results_pool_.Clear(); expr_results_pool_.Clear();
return Status::OK(); return Status::OK();
@@ -1096,6 +1234,45 @@ Status Sorter::InputDone() {
// Sort the tuples in the last run. // Sort the tuples in the last run.
RETURN_IF_ERROR(SortCurrentInputRun()); RETURN_IF_ERROR(SortCurrentInputRun());
if (inmem_run_max_pages_ > 0) {
sorted_inmem_runs_.push_back(unsorted_run_);
unsorted_run_ = nullptr;
if (!HasSpilledRuns()) {
if (sorted_inmem_runs_.size() == 1) {
DCHECK(sorted_inmem_runs_.back()->is_pinned());
DCHECK(merge_output_run_ == nullptr);
RETURN_IF_ERROR(sorted_inmem_runs_.back()->PrepareRead());
return Status::OK();
}
if (enable_spilling_) {
DCHECK(merge_output_run_ != nullptr);
merge_output_run_->CloseAllPages();
merge_output_run_ = nullptr;
}
// 'merge_output_run_' is not initialized for partial sort, because the output
// will be read directly from the merger.
DCHECK(enable_spilling_ || merge_output_run_ == nullptr);
return CreateMerger(sorted_inmem_runs_.size(), false);
}
DCHECK(enable_spilling_);
if (sorted_inmem_runs_.size() == 1) {
sorted_runs_.push_back(sorted_inmem_runs_.back());
sorted_inmem_runs_.clear();
DCHECK_GT(sorted_runs_.back()->run_size(), 0);
RETURN_IF_ERROR(sorted_runs_.back()->UnpinAllPages());
} else {
RETURN_IF_ERROR(MergeInMemoryRuns());
}
DCHECK(sorted_inmem_runs_.empty());
// Merge intermediate runs until we have a final merge set-up.
return MergeIntermediateRuns();
} else {
sorted_runs_.push_back(unsorted_run_);
}
unsorted_run_ = nullptr;
if (sorted_runs_.size() == 1) { if (sorted_runs_.size() == 1) {
// The entire input fit in one run. Read sorted rows in GetNext() directly from the // The entire input fit in one run. Read sorted rows in GetNext() directly from the
// in-memory sorted run. // in-memory sorted run.
@@ -1114,10 +1291,19 @@ Status Sorter::InputDone() {
return MergeIntermediateRuns(); return MergeIntermediateRuns();
} }
Status Sorter::GetNext(RowBatch* output_batch, bool* eos) { Status Sorter::GetNext(RowBatch* output_batch, bool* eos) {
if (sorted_runs_.size() == 1) { if (sorted_inmem_runs_.size() == 1 && !HasSpilledRuns()) {
DCHECK(sorted_inmem_runs_.back()->is_pinned());
return sorted_inmem_runs_.back()->GetNext<false>(output_batch, eos);
} else if (inmem_run_max_pages_ == 0 && sorted_runs_.size() == 1) {
DCHECK(sorted_runs_.back()->is_pinned()); DCHECK(sorted_runs_.back()->is_pinned());
return sorted_runs_.back()->GetNext<false>(output_batch, eos); return sorted_runs_.back()->GetNext<false>(output_batch, eos);
} else if (inmem_run_max_pages_ > 0 && !HasSpilledRuns()) {
RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
// Clear any temporary allocations made by the merger.
expr_results_pool_.Clear();
return Status::OK();
} else { } else {
RETURN_IF_ERROR(merger_->GetNext(output_batch, eos)); RETURN_IF_ERROR(merger_->GetNext(output_batch, eos));
// Clear any temporary allocations made by the merger. // Clear any temporary allocations made by the merger.
@@ -1144,6 +1330,7 @@ void Sorter::Close(RuntimeState* state) {
} }
void Sorter::CleanupAllRuns() { void Sorter::CleanupAllRuns() {
Run::CleanupRuns(&sorted_inmem_runs_);
Run::CleanupRuns(&sorted_runs_); Run::CleanupRuns(&sorted_runs_);
Run::CleanupRuns(&merging_runs_); Run::CleanupRuns(&merging_runs_);
if (unsorted_run_ != nullptr) unsorted_run_->CloseAllPages(); if (unsorted_run_ != nullptr) unsorted_run_->CloseAllPages();
@@ -1160,10 +1347,8 @@ Status Sorter::SortCurrentInputRun() {
SCOPED_TIMER(in_mem_sort_timer_); SCOPED_TIMER(in_mem_sort_timer_);
RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_)); RETURN_IF_ERROR(in_mem_tuple_sorter_->Sort(unsorted_run_));
} }
sorted_runs_.push_back(unsorted_run_);
sorted_data_size_->Add(unsorted_run_->TotalBytes()); sorted_data_size_->Add(unsorted_run_->TotalBytes());
run_sizes_->UpdateCounter(unsorted_run_->num_tuples()); run_sizes_->UpdateCounter(unsorted_run_->num_tuples());
unsorted_run_ = nullptr;
RETURN_IF_CANCELLED(state_); RETURN_IF_CANCELLED(state_);
return Status::OK(); return Status::OK();
@@ -1236,8 +1421,31 @@ int Sorter::GetNumOfRunsForMerge() const {
return max_runs_in_next_merge; return max_runs_in_next_merge;
} }
Status Sorter::MergeInMemoryRuns() {
DCHECK_GE(sorted_inmem_runs_.size(), 2);
DCHECK_GT(sorted_inmem_runs_.back()->run_size(), 0);
// No need to allocate more memory before doing in-memory merges, because the
// buffers of the in-memory runs are already open and they fit into memory.
// The merge output run is already initialized, too.
DCHECK(merge_output_run_ != nullptr) << "Should have initialized output run for merge.";
RETURN_IF_ERROR(CreateMerger(sorted_inmem_runs_.size(), false));
{
SCOPED_TIMER(in_mem_merge_timer_);
RETURN_IF_ERROR(ExecuteIntermediateMerge(merge_output_run_));
}
spilled_runs_counter_->Add(1);
sorted_runs_.push_back(merge_output_run_);
DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
merge_output_run_ = nullptr;
DCHECK(sorted_inmem_runs_.empty());
return Status::OK();
}
Status Sorter::MergeIntermediateRuns() { Status Sorter::MergeIntermediateRuns() {
DCHECK_GE(sorted_runs_.size(), 2); DCHECK_GE(sorted_runs_.size(), 2);
DCHECK_GT(sorted_runs_.back()->fixed_len_size(), 0);
// Attempt to allocate more memory before doing intermediate merges. This may // Attempt to allocate more memory before doing intermediate merges. This may
// be possible if other operators have relinquished memory after the sort has built // be possible if other operators have relinquished memory after the sort has built
@@ -1248,7 +1456,7 @@ Status Sorter::MergeIntermediateRuns() {
int num_of_runs_to_merge = GetNumOfRunsForMerge(); int num_of_runs_to_merge = GetNumOfRunsForMerge();
DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge."; DCHECK(merge_output_run_ == nullptr) << "Should have finished previous merge.";
RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge)); RETURN_IF_ERROR(CreateMerger(num_of_runs_to_merge, true));
// If CreateMerger() consumed all the sorted runs, we have set up the final merge. // If CreateMerger() consumed all the sorted runs, we have set up the final merge.
if (sorted_runs_.empty()) return Status::OK(); if (sorted_runs_.empty()) return Status::OK();
@@ -1263,9 +1471,16 @@ Status Sorter::MergeIntermediateRuns() {
return Status::OK(); return Status::OK();
} }
Status Sorter::CreateMerger(int num_runs) { Status Sorter::CreateMerger(int num_runs, bool external) {
std::deque<impala::Sorter::Run *>* runs_to_merge;
if (external) {
DCHECK_GE(sorted_runs_.size(), 2);
runs_to_merge = &sorted_runs_;
} else {
runs_to_merge = &sorted_inmem_runs_;
}
DCHECK_GE(num_runs, 2); DCHECK_GE(num_runs, 2);
DCHECK_GE(sorted_runs_.size(), 2);
// Clean up the runs from the previous merge. // Clean up the runs from the previous merge.
Run::CleanupRuns(&merging_runs_); Run::CleanupRuns(&merging_runs_);
@@ -1273,24 +1488,25 @@ Status Sorter::CreateMerger(int num_runs) {
// from the runs being merged. This is unnecessary overhead that is not required if we // from the runs being merged. This is unnecessary overhead that is not required if we
// correctly transfer resources. // correctly transfer resources.
merger_.reset( merger_.reset(
new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, true, new SortedRunMerger(*compare_less_than_, output_row_desc_, profile_, external,
codegend_heapify_helper_fn_)); codegend_heapify_helper_fn_));
vector<function<Status (RowBatch**)>> merge_runs; vector<function<Status (RowBatch**)>> merge_runs;
merge_runs.reserve(num_runs); merge_runs.reserve(num_runs);
for (int i = 0; i < num_runs; ++i) { for (int i = 0; i < num_runs; ++i) {
Run* run = sorted_runs_.front(); Run* run = runs_to_merge->front();
RETURN_IF_ERROR(run->PrepareRead()); RETURN_IF_ERROR(run->PrepareRead());
// Run::GetNextBatch() is used by the merger to retrieve a batch of rows to merge // Run::GetNextBatch() is used by the merger to retrieve a batch of rows to merge
// from this run. // from this run.
merge_runs.emplace_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1)); merge_runs.emplace_back(bind<Status>(mem_fn(&Run::GetNextBatch), run, _1));
sorted_runs_.pop_front(); runs_to_merge->pop_front();
merging_runs_.push_back(run); merging_runs_.push_back(run);
} }
RETURN_IF_ERROR(merger_->Prepare(merge_runs)); RETURN_IF_ERROR(merger_->Prepare(merge_runs));
if (external) {
num_merges_counter_->Add(1); num_merges_counter_->Add(1);
}
return Status::OK(); return Status::OK();
} }

View File

@@ -105,7 +105,7 @@ class Sorter {
/// 'enable_spilling' should be set to false to reduce the number of requested buffers /// 'enable_spilling' should be set to false to reduce the number of requested buffers
/// if the caller will use AddBatchNoSpill(). /// if the caller will use AddBatchNoSpill().
/// 'codegend_sort_helper_fn' is a reference to the codegen version of /// 'codegend_sort_helper_fn' is a reference to the codegen version of
/// the Sorter::TupleSorter::SortHelp() method. /// the Sorter::TupleSorter::SortHelper() method.
/// 'estimated_input_size' is the total rows in bytes that are estimated to get added /// 'estimated_input_size' is the total rows in bytes that are estimated to get added
/// into this sorter. This is used to decide if sorter needs to proactively spill for /// into this sorter. This is used to decide if sorter needs to proactively spill for
/// the first run. -1 value means estimate is unavailable. /// the first run. -1 value means estimate is unavailable.
@@ -166,20 +166,39 @@ class Sorter {
/// Return true if the sorter has any spilled runs. /// Return true if the sorter has any spilled runs.
bool HasSpilledRuns() const; bool HasSpilledRuns() const;
/// The logic in AddBatchNoSpill() that handles the different cases when
/// AddBatchInternal() returns without processing the entire batch.
/// In this case we need to initialize a new minirun for the incoming rows.
/// Tries initializing a new minirun. If the initialization was successful,
/// allocation_failed is set to false, otherwise (e.g. a memory limit is hit) true.
Status InitializeNewMinirun(bool* allocation_failed) WARN_UNUSED_RESULT;
private: private:
class Page; class Page;
class Run; class Run;
/// Minimum value for sot_run_bytes_limit query option. /// Minimum value for sort_run_bytes_limit query option.
static const int64_t MIN_SORT_RUN_BYTES_LIMIT = 32 << 20; // 32 MB static const int64_t MIN_SORT_RUN_BYTES_LIMIT = 32 << 20; // 32 MB
/// Merges multiple smaller runs in sorted_inmem_runs_ into a single larger merged
/// run that can be spilled page by page during the process, until all pages from
/// sorted_inmem_runs are consumed and freed.
/// Always performs one-level merge and spills the output run to disc. Therefore there
/// is no need to deep-copy the input, since the pages containing var-len data
/// remain in the memory until the end of the in-memory merge.
/// TODO: delete pages that are consumed during merge asap.
Status MergeInMemoryRuns() WARN_UNUSED_RESULT;
/// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign it to /// Create a SortedRunMerger from sorted runs in 'sorted_runs_' and assign it to
/// 'merger_'. 'num_runs' indicates how many runs should be covered by the current /// 'merger_'. 'num_runs' indicates how many runs should be covered by the current
/// merging attempt. Returns error if memory allocation fails during in /// merging attempt. Returns error if memory allocation fails in Run::PrepareRead().
/// Run::PrepareRead(). The runs to be merged are removed from 'sorted_runs_'. The /// The runs to be merged are removed from 'sorted_runs_'.
/// If 'external' is set to true, it performs an external merge, and the
/// Sorter sets the 'deep_copy_input' flag to true for the merger, since the pages /// Sorter sets the 'deep_copy_input' flag to true for the merger, since the pages
/// containing input run data will be deleted as input runs are read. /// containing input run data will be deleted as input runs are read.
Status CreateMerger(int num_runs) WARN_UNUSED_RESULT; /// If 'external' is false, it creates an in-memory merger for the in-memory miniruns.
/// In this case, 'deep_copy_input' is set to false.
Status CreateMerger(int num_runs, bool external) WARN_UNUSED_RESULT;
/// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger /// Repeatedly replaces multiple smaller runs in sorted_runs_ with a single larger
/// merged run until there are few enough runs to be merged with a single merger. /// merged run until there are few enough runs to be merged with a single merger.
@@ -193,6 +212,10 @@ class Sorter {
/// and adding them to 'merged_run'. /// and adding them to 'merged_run'.
Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT; Status ExecuteIntermediateMerge(Sorter::Run* merged_run) WARN_UNUSED_RESULT;
/// Handles cases in AddBatch where a memory limit is reached and spilling must start.
/// Used only in case of multiple in-memory runs set by query option MAX_SORT_RUN_SIZE.
inline Status MergeAndSpill() WARN_UNUSED_RESULT;
/// Called once there no more rows to be added to 'unsorted_run_'. Sorts /// Called once there no more rows to be added to 'unsorted_run_'. Sorts
/// 'unsorted_run_' and appends it to the list of sorted runs. /// 'unsorted_run_' and appends it to the list of sorted runs.
Status SortCurrentInputRun() WARN_UNUSED_RESULT; Status SortCurrentInputRun() WARN_UNUSED_RESULT;
@@ -266,8 +289,8 @@ class Sorter {
const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& codegend_heapify_helper_fn_; const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn>& codegend_heapify_helper_fn_;
/// A default codegened function pointer storing nullptr, which is used when the /// A default codegened function pointer storing nullptr, which is used when the
/// merger is not needed. Used as a default value in constructor, when the CodegenFnPtr /// merger is not needed. Used as a default value in the constructor, when the
/// is not provided. /// CodegenFnPtr is not provided.
static const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> default_heapify_helper_fn_; static const CodegenFnPtr<SortedRunMerger::HeapifyHelperFn> default_heapify_helper_fn_;
/// Client used to allocate pages from the buffer pool. Not owned. /// Client used to allocate pages from the buffer pool. Not owned.
@@ -302,9 +325,13 @@ class Sorter {
/// When it is added to sorted_runs_, it is set to NULL. /// When it is added to sorted_runs_, it is set to NULL.
Run* unsorted_run_; Run* unsorted_run_;
/// List of sorted runs that have been produced but not merged. unsorted_run_ is added /// List of quicksorted miniruns before merging in memory.
/// to this list after an in-memory sort. Sorted runs produced by intermediate merges std::deque<Run*> sorted_inmem_runs_;
/// are also added to this list during the merge. Runs are added to the object pool.
/// List of sorted runs that have been produced but not merged. 'unsorted_run_' is
/// added to this list after an in-memory sort. Sorted runs produced by intermediate
/// merges are also added to this list during the merge. Runs are added to the
/// object pool.
std::deque<Run*> sorted_runs_; std::deque<Run*> sorted_runs_;
/// Merger object (intermediate or final) currently used to produce sorted runs. /// Merger object (intermediate or final) currently used to produce sorted runs.
@@ -345,6 +372,9 @@ class Sorter {
/// Time spent sorting initial runs in memory. /// Time spent sorting initial runs in memory.
RuntimeProfile::Counter* in_mem_sort_timer_; RuntimeProfile::Counter* in_mem_sort_timer_;
/// Time spent merging initial miniruns in memory.
RuntimeProfile::Counter* in_mem_merge_timer_;
/// Total size of the initial runs in bytes. /// Total size of the initial runs in bytes.
RuntimeProfile::Counter* sorted_data_size_; RuntimeProfile::Counter* sorted_data_size_;
@@ -353,6 +383,12 @@ class Sorter {
/// Flag to enforce sort_run_bytes_limit. /// Flag to enforce sort_run_bytes_limit.
bool enforce_sort_run_bytes_limit_ = false; bool enforce_sort_run_bytes_limit_ = false;
/// Maximum number of fixed-length + variable-length pages in an in-memory run set by
/// Query Option MAX_SORT_RUN_SIZE.
/// The default value is 0 which means that only 1 in-memory run will be created, and
/// its size will be determined by other limits eg. memory or sort_run_bytes_limit.
int inmem_run_max_pages_ = 0;
}; };
} // namespace impala } // namespace impala

View File

@@ -1099,6 +1099,15 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_max_fragment_instances_per_node(max_num); query_options->__set_max_fragment_instances_per_node(max_num);
break; break;
} }
case TImpalaQueryOptions::MAX_SORT_RUN_SIZE: {
int32_t int32_t_val = 0;
RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
option, value, &int32_t_val));
RETURN_IF_ERROR(
QueryOptionValidator<int32_t>::NotEquals(option, int32_t_val, 1));
query_options->__set_max_sort_run_size(int32_t_val);
break;
}
default: default:
if (IsRemovedQueryOption(key)) { if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";

View File

@@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// time we add or remove a query option to/from the enum TImpalaQueryOptions. // time we add or remove a query option to/from the enum TImpalaQueryOptions.
#define QUERY_OPTS_TABLE \ #define QUERY_OPTS_TABLE \
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \
TImpalaQueryOptions::MAX_FRAGMENT_INSTANCES_PER_NODE + 1); \ TImpalaQueryOptions::MAX_SORT_RUN_SIZE + 1); \
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \
@@ -291,7 +291,8 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(join_selectivity_correlation_factor, JOIN_SELECTIVITY_CORRELATION_FACTOR, \ QUERY_OPT_FN(join_selectivity_correlation_factor, JOIN_SELECTIVITY_CORRELATION_FACTOR, \
TQueryOptionLevel::ADVANCED) \ TQueryOptionLevel::ADVANCED) \
QUERY_OPT_FN(max_fragment_instances_per_node, MAX_FRAGMENT_INSTANCES_PER_NODE, \ QUERY_OPT_FN(max_fragment_instances_per_node, MAX_FRAGMENT_INSTANCES_PER_NODE, \
TQueryOptionLevel::ADVANCED); TQueryOptionLevel::ADVANCED); \
QUERY_OPT_FN(max_sort_run_size, MAX_SORT_RUN_SIZE, TQueryOptionLevel::DEVELOPMENT) ;
/// Enforce practical limits on some query options to avoid undesired query state. /// Enforce practical limits on some query options to avoid undesired query state.
static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB

View File

@@ -59,6 +59,8 @@ class ComparatorWrapper {
} }
}; };
class TupleRowComparator;
/// TupleRowComparatorConfig contains the static state initialized from its corresponding /// TupleRowComparatorConfig contains the static state initialized from its corresponding
/// thrift structure. It serves as an input for creating instances of the /// thrift structure. It serves as an input for creating instances of the
/// TupleRowComparator class. /// TupleRowComparator class.
@@ -102,8 +104,8 @@ class TupleRowComparatorConfig {
std::vector<int8_t> nulls_first_; std::vector<int8_t> nulls_first_;
/// Codegened version of TupleRowComparator::Compare(). /// Codegened version of TupleRowComparator::Compare().
typedef int (*CompareFn)(ScalarExprEvaluator* const*, ScalarExprEvaluator* const*, typedef int (*CompareFn)(const TupleRowComparator*, ScalarExprEvaluator* const*,
const TupleRow*, const TupleRow*); ScalarExprEvaluator* const*, const TupleRow*, const TupleRow*);
CodegenFnPtr<CompareFn> codegend_compare_fn_; CodegenFnPtr<CompareFn> codegend_compare_fn_;
private: private:
@@ -156,8 +158,19 @@ class TupleRowComparator {
/// hot loops. /// hot loops.
bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const { bool ALWAYS_INLINE Less(const TupleRow* lhs, const TupleRow* rhs) const {
return Compare( return Compare(
ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) < 0;
< 0; }
bool ALWAYS_INLINE LessCodegend(const TupleRow* lhs, const TupleRow* rhs) const {
if (codegend_compare_fn_non_atomic_ != nullptr) {
return codegend_compare_fn_non_atomic_(this,
ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) < 0;
} else {
TupleRowComparatorConfig::CompareFn fn = codegend_compare_fn_.load();
if (fn != nullptr) codegend_compare_fn_non_atomic_ = fn;
}
return Compare(
ordering_expr_evals_lhs_.data(), ordering_expr_evals_rhs_.data(), lhs, rhs) < 0;
} }
bool ALWAYS_INLINE Less(const Tuple* lhs, const Tuple* rhs) const { bool ALWAYS_INLINE Less(const Tuple* lhs, const Tuple* rhs) const {
@@ -197,6 +210,7 @@ class TupleRowComparator {
/// Reference to the codegened function pointer owned by the TupleRowComparatorConfig /// Reference to the codegened function pointer owned by the TupleRowComparatorConfig
/// object that was used to create this instance. /// object that was used to create this instance.
const CodegenFnPtr<TupleRowComparatorConfig::CompareFn>& codegend_compare_fn_; const CodegenFnPtr<TupleRowComparatorConfig::CompareFn>& codegend_compare_fn_;
mutable TupleRowComparatorConfig::CompareFn codegend_compare_fn_non_atomic_ = nullptr;
private: private:
/// Interpreted implementation of Compare(). /// Interpreted implementation of Compare().

View File

@@ -64,7 +64,7 @@ sudo echo "test sudo"
sudo perf record -F 99 -g -a & sudo perf record -F 99 -g -a &
perf_pid=$! perf_pid=$!
~/Impala/bin/impala-shell.sh -q "$1" ${IMPALA_HOME}/bin/impala-shell.sh -q "$1"
# Send interrupt to 'perf record'. We need to issue 'kill' in a new session/process # Send interrupt to 'perf record'. We need to issue 'kill' in a new session/process
# group via 'setsid', otherwise 'perf record' won't get the signal (because it's # group via 'setsid', otherwise 'perf record' won't get the signal (because it's

View File

@@ -785,6 +785,16 @@ enum TImpalaQueryOptions {
// PROCESSING_COST_MIN_THREADS option has higher value. // PROCESSING_COST_MIN_THREADS option has higher value.
// Valid values are in [1, 128]. Default to 128. // Valid values are in [1, 128]. Default to 128.
MAX_FRAGMENT_INSTANCES_PER_NODE = 156 MAX_FRAGMENT_INSTANCES_PER_NODE = 156
// Configures the in-memory sort algorithm used in the sorter. Determines the
// maximum number of pages in an initial in-memory run (fixed + variable length).
// 0 means unlimited, which will create 1 big run with no in-memory merge phase.
// Setting any other other value can create multiple miniruns which leads to an
// in-memory merge phase. The minimum value in that case is 2.
// Generally, with larger workloads the recommended value is 10 or more to avoid
// high fragmentation of variable length data.
MAX_SORT_RUN_SIZE = 157;
} }
// The summary of a DML statement. // The summary of a DML statement.

View File

@@ -614,6 +614,7 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift // See comment in ImpalaService.thrift
150: optional bool disable_codegen_cache = false; 150: optional bool disable_codegen_cache = false;
151: optional TCodeGenCacheMode codegen_cache_mode = TCodeGenCacheMode.NORMAL; 151: optional TCodeGenCacheMode codegen_cache_mode = TCodeGenCacheMode.NORMAL;
// See comment in ImpalaService.thrift // See comment in ImpalaService.thrift
@@ -633,6 +634,10 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift // See comment in ImpalaService.thrift
157: optional i32 max_fragment_instances_per_node = MAX_FRAGMENT_INSTANCES_PER_NODE 157: optional i32 max_fragment_instances_per_node = MAX_FRAGMENT_INSTANCES_PER_NODE
// Configures the in-memory sort algorithm used in the sorter.
// See comment in ImpalaService.thrift
158: optional i32 max_sort_run_size = 0;
} }
// Impala currently has three types of sessions: Beeswax, HiveServer2 and external // Impala currently has three types of sessions: Beeswax, HiveServer2 and external

View File

@@ -21,6 +21,10 @@ from copy import copy, deepcopy
from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfNotHdfsMinicluster from tests.common.skip import SkipIfNotHdfsMinicluster
from tests.common.test_vector import ImpalaTestDimension
# Run sizes (number of pages per run) in sorter
MAX_SORT_RUN_SIZE = [0, 2, 20]
def split_result_rows(result): def split_result_rows(result):
@@ -55,6 +59,8 @@ class TestQueryFullSort(ImpalaTestSuite):
@classmethod @classmethod
def add_test_dimensions(cls): def add_test_dimensions(cls):
super(TestQueryFullSort, cls).add_test_dimensions() super(TestQueryFullSort, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
*MAX_SORT_RUN_SIZE))
if cls.exploration_strategy() == 'core': if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_constraint(lambda v:\ cls.ImpalaTestMatrix.add_constraint(lambda v:\
@@ -229,10 +235,21 @@ class TestQueryFullSort(ImpalaTestSuite):
class TestRandomSort(ImpalaTestSuite): class TestRandomSort(ImpalaTestSuite):
@classmethod @classmethod
def get_workload(self): def get_workload(self):
return 'functional' return 'functional-query'
def test_order_by_random(self): @classmethod
def add_test_dimensions(cls):
super(TestRandomSort, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
*MAX_SORT_RUN_SIZE))
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
def test_order_by_random(self, vector):
"""Tests that 'order by random()' works as expected.""" """Tests that 'order by random()' works as expected."""
exec_option = copy(vector.get_value('exec_option'))
# "order by random()" with different seeds should produce different orderings. # "order by random()" with different seeds should produce different orderings.
seed_query = "select * from functional.alltypestiny order by random(%s)" seed_query = "select * from functional.alltypestiny order by random(%s)"
results_seed0 = self.execute_query(seed_query % "0") results_seed0 = self.execute_query(seed_query % "0")
@@ -242,8 +259,8 @@ class TestRandomSort(ImpalaTestSuite):
# Include "random()" in the select list to check that it's sorted correctly. # Include "random()" in the select list to check that it's sorted correctly.
results = transpose_results(self.execute_query( results = transpose_results(self.execute_query(
"select random() as r from functional.alltypessmall order by r").data, "select random() as r from functional.alltypessmall order by r",
lambda x: float(x)) exec_option).data, lambda x: float(x))
assert(results[0] == sorted(results[0])) assert(results[0] == sorted(results[0]))
# Like above, but with a limit. # Like above, but with a limit.
@@ -254,22 +271,40 @@ class TestRandomSort(ImpalaTestSuite):
# "order by random()" inside an inline view. # "order by random()" inside an inline view.
query = "select r from (select random() r from functional.alltypessmall) v order by r" query = "select r from (select random() r from functional.alltypessmall) v order by r"
results = transpose_results(self.execute_query(query).data, lambda x: float(x)) results = transpose_results(self.execute_query(query, exec_option).data,
lambda x: float(x))
assert (results == sorted(results)) assert (results == sorted(results))
def test_analytic_order_by_random(self): def test_analytic_order_by_random(self, vector):
"""Tests that a window function over 'order by random()' works as expected.""" """Tests that a window function over 'order by random()' works as expected."""
exec_option = copy(vector.get_value('exec_option'))
# Since we use the same random seed, the results should be returned in order. # Since we use the same random seed, the results should be returned in order.
query = """select last_value(rand(2)) over (order by rand(2)) from query = """select last_value(rand(2)) over (order by rand(2)) from
functional.alltypestiny""" functional.alltypestiny"""
results = transpose_results(self.execute_query(query).data, lambda x: float(x)) results = transpose_results(self.execute_query(query, exec_option).data,
lambda x: float(x))
assert (results == sorted(results)) assert (results == sorted(results))
class TestPartialSort(ImpalaTestSuite): class TestPartialSort(ImpalaTestSuite):
"""Test class to do functional validation of partial sorts.""" """Test class to do functional validation of partial sorts."""
def test_partial_sort_min_reservation(self, unique_database): @classmethod
def get_workload(self):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestPartialSort, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
*MAX_SORT_RUN_SIZE))
if cls.exploration_strategy() == 'core':
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
def test_partial_sort_min_reservation(self, vector, unique_database):
"""Test that the partial sort node can operate if it only gets its minimum """Test that the partial sort node can operate if it only gets its minimum
memory reservation.""" memory reservation."""
table_name = "%s.kudu_test" % unique_database table_name = "%s.kudu_test" % unique_database
@@ -277,10 +312,36 @@ class TestPartialSort(ImpalaTestSuite):
"debug_action", "-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0") "debug_action", "-1:OPEN:SET_DENY_RESERVATION_PROBABILITY@1.0")
self.execute_query("""create table %s (col0 string primary key) self.execute_query("""create table %s (col0 string primary key)
partition by hash(col0) partitions 8 stored as kudu""" % table_name) partition by hash(col0) partitions 8 stored as kudu""" % table_name)
exec_option = copy(vector.get_value('exec_option'))
result = self.execute_query( result = self.execute_query(
"insert into %s select string_col from functional.alltypessmall" % table_name) "insert into %s select string_col from functional.alltypessmall" % table_name,
exec_option)
assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile assert "PARTIAL SORT" in result.runtime_profile, result.runtime_profile
def test_partial_sort_kudu_insert(self, vector, unique_database):
table_name = "%s.kudu_partial_sort_test" % unique_database
self.execute_query("""create table %s (l_linenumber INT, l_orderkey BIGINT,
l_partkey BIGINT, l_shipdate STRING, l_quantity DECIMAL(12,2),
l_comment STRING, PRIMARY KEY(l_linenumber, l_orderkey) )
PARTITION BY RANGE (l_linenumber)
(
PARTITION VALUE = 1,
PARTITION VALUE = 2,
PARTITION VALUE = 3,
PARTITION VALUE = 4,
PARTITION VALUE = 5,
PARTITION VALUE = 6,
PARTITION VALUE = 7
)
STORED AS KUDU""" % table_name)
exec_option = copy(vector.get_value('exec_option'))
result = self.execute_query(
"""insert into %s SELECT l_linenumber, l_orderkey, l_partkey, l_shipdate,
l_quantity, l_comment FROM tpch.lineitem limit 300000""" % table_name,
exec_option)
assert "NumModifiedRows: 300000" in result.runtime_profile, result.runtime_profile
assert "NumRowErrors: 0" in result.runtime_profile, result.runtime_profile
class TestArraySort(ImpalaTestSuite): class TestArraySort(ImpalaTestSuite):
"""Tests where there are arrays in the sorting tuple.""" """Tests where there are arrays in the sorting tuple."""
@@ -292,7 +353,8 @@ class TestArraySort(ImpalaTestSuite):
@classmethod @classmethod
def add_test_dimensions(cls): def add_test_dimensions(cls):
super(TestArraySort, cls).add_test_dimensions() super(TestArraySort, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('max_sort_run_size',
*MAX_SORT_RUN_SIZE))
# The table we use is a parquet table. # The table we use is a parquet table.
cls.ImpalaTestMatrix.add_constraint(lambda v: cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet') v.get_value('table_format').file_format == 'parquet')