IMPALA-3670: fix sorter buffer mgmt bugs

Also make test_scratch_disk.py more deterministic, by using
max_block_mgr_memory, which doesn't include scanner memory.
The fixed test_scratch_disk.py exercises the other sorter bugs
that occurs when scratch cannot be written.

Testing:
Added a test that does a sort with various memory limits and consumes
the whole output of the sorter (we have many tests of sorts with limits
but limited coverage of sorts without limits).  Ran an exhaustive test
run before posting for review.

This added test reproduced one of the sorter bugs, where var-len blocks
were not always attached to the output batch. The other test was
reproduced by the test change in IMPALA-3669: test_scratch_disk fix.

Change-Id: Ia1a0ddffa0a5b157ab86a376b7b7360a923698d6
Reviewed-on: http://gerrit.cloudera.org:8080/3315
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
This commit is contained in:
Tim Armstrong
2016-06-03 14:19:12 -07:00
parent a1b035a251
commit d23e5505c8
2 changed files with 45 additions and 25 deletions

View File

@@ -670,6 +670,7 @@ Status Sorter::Run::UnpinAllBlocks() {
// A list of var len blocks to replace 'var_len_blocks_'. Note that after we are done
// we may have a different number of blocks, because internal fragmentation may leave
// slightly different amounts of wasted space at the end of each block.
// We need to be careful to clean up these blocks if we run into an error in this method.
vector<BufferedBlockMgr::Block*> sorted_var_len_blocks;
sorted_var_len_blocks.reserve(var_len_blocks_.size());
@@ -692,6 +693,7 @@ Status Sorter::Run::UnpinAllBlocks() {
DCHECK(var_len_copy_block_ == NULL);
}
Status status;
for (int i = 0; i < fixed_len_blocks_.size(); ++i) {
BufferedBlockMgr::Block* cur_fixed_block = fixed_len_blocks_[i];
// Skip converting the pointers if no var-len slots, or if all the values are null
@@ -706,7 +708,8 @@ Status Sorter::Run::UnpinAllBlocks() {
DCHECK(cur_sorted_var_len_block != NULL);
if (cur_sorted_var_len_block->BytesRemaining() < total_var_len) {
bool added;
RETURN_IF_ERROR(TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, &added));
status = TryAddBlock(UNPIN_PREV, &sorted_var_len_blocks, &added);
if (!status.ok()) goto cleanup_blocks;
DCHECK(added) << "TryAddBlock() with UNPIN_PREV should not fail to add";
cur_sorted_var_len_block = sorted_var_len_blocks.back();
}
@@ -722,7 +725,8 @@ Status Sorter::Run::UnpinAllBlocks() {
if (HasVarLenBlocks()) {
DCHECK_GT(sorted_var_len_blocks.back()->valid_data_len(), 0);
RETURN_IF_ERROR(sorted_var_len_blocks.back()->Unpin());
status = sorted_var_len_blocks.back()->Unpin();
if (!status.ok()) goto cleanup_blocks;
}
// Clear var_len_blocks_ and replace with it with the contents of sorted_var_len_blocks
@@ -731,6 +735,10 @@ Status Sorter::Run::UnpinAllBlocks() {
is_pinned_ = false;
sorter_->spilled_runs_counter_->Add(1);
return Status::OK();
cleanup_blocks:
DeleteAndClearBlocks(&sorted_var_len_blocks);
return status;
}
Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) {
@@ -802,10 +810,9 @@ Status Sorter::Run::GetNextBatch(RowBatch** output_batch) {
template <bool CONVERT_OFFSET_TO_PTR>
Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
// Var-len offsets are converted only when reading var-len data from unpinned runs.
if (HasVarLenBlocks()) DCHECK_EQ(!is_pinned_, CONVERT_OFFSET_TO_PTR);
// We shouldn't convert var len offsets if there are no blocks, since in that case
// they must all be null or zero-length strings, which don't point into a valid block.
if (!HasVarLenBlocks()) DCHECK(!CONVERT_OFFSET_TO_PTR);
DCHECK_EQ(CONVERT_OFFSET_TO_PTR, HasVarLenBlocks() && !is_pinned_);
if (end_of_fixed_len_block_ &&
fixed_len_blocks_index_ >= static_cast<int>(fixed_len_blocks_.size()) - 1) {
@@ -851,22 +858,14 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
fixed_len_block->buffer() + fixed_len_block_offset_);
if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
DCHECK(!is_pinned_);
// The var-len data is in the next block. We are done with the current block, so
// return rows we've accumulated so far and advance to the next block in the next
// GetNext() call. This is needed for the unpinned case where we can't pin the next
// block before we delete the current block.
if (is_pinned_) {
// Attach block to batch. We don't need the block any more and we don't need to
// reclaim the block's memory, since we already either have the sorted data all in
// memory. The caller can delete the block when it wants to.
output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]);
var_len_blocks_[var_len_blocks_index_] = NULL;
} else {
// To iterate over unpinned runs, we need to exchange this block for the next
// in the next GetNext() call, so we need to hold onto the block and signal to
// the caller that the block is going to be deleted.
output_batch->MarkNeedToReturn();
}
// GetNext() call. This is needed for the unpinned case where we need to exchange
// this block for the next in the next GetNext() call. So therefore we must hold
// onto the current var-len block and signal to the caller that the block is going
// to be deleted.
output_batch->MarkNeedToReturn();
end_of_var_len_block_ = true;
break;
}
@@ -883,11 +882,13 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_]);
fixed_len_blocks_[fixed_len_blocks_index_] = NULL;
// Also attach the last var-len block at eos, since it won't be attached elsewhere.
if (HasVarLenBlocks() &&
fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
output_batch->AddBlock(var_len_blocks_[var_len_blocks_index_]);
var_len_blocks_[var_len_blocks_index_] = NULL;
// Attach the var-len blocks at eos once no more rows will reference the blocks.
if (fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
for (BufferedBlockMgr::Block* var_len_block: var_len_blocks_) {
DCHECK(var_len_block != NULL);
output_batch->AddBlock(var_len_block);
}
var_len_blocks_.clear();
}
} else {
// To iterate over unpinned runs, we need to exchange this block for the next

View File

@@ -61,6 +61,25 @@ class TestQueryFullSort(ImpalaTestSuite):
query, exec_option, table_format=table_format).data)
assert(result[0] == sorted(result[0]))
def test_multiple_mem_limits_full_output(self, vector):
""" Exercise a range of memory limits, returning the full sorted input. """
query = """select o_orderdate, o_custkey, o_comment
from orders
order by o_orderdate"""
exec_option = vector.get_value('exec_option')
table_format = vector.get_value('table_format')
max_block_mgr_memory_values = ['-1', '48M'] # Unlimited and minimum memory.
if self.exploration_strategy() == 'exhaustive' and \
table_format.file_format == 'parquet':
# Test some intermediate values for parquet on exhaustive.
max_block_mgr_memory_values += ['64M', '128M', '256M']
for max_block_mgr_memory in max_block_mgr_memory_values:
exec_option['max_block_mgr_memory'] = max_block_mgr_memory
result = transpose_results(self.execute_query(
query, exec_option, table_format=table_format).data)
assert(result[0] == sorted(result[0]))
def test_sort_join(self, vector):
"""With 200m memory limit this should be a 2-phase sort"""
@@ -99,9 +118,9 @@ class TestQueryFullSort(ImpalaTestSuite):
in the right partition.
"""
query = """select l_orderkey from (
select * from tpch.lineitem limit 300000
select * from lineitem limit 300000
union all
select * from tpch.lineitem limit 300000) t
select * from lineitem limit 300000) t
order by l_orderkey"""
exec_option = vector.get_value('exec_option')