IMPALA-11444: Fix wrong results when reading wide rows from ORC

After IMPALA-9228, ORC scanner reads rows into scratch batch where we
perform conjuncts and runtime filters. The survived rows will be picked
by the output row batch. We loop this until the output row batch is
filled (1024 rows by default) or we finish reading the ORC batch (1024
rows by default).

Usually the loop will have only 1 iteration since the scratch batch
capacity is also 1024. All rows of the current ORC batch can be
materialized into the scratch batch. However, when reading wide rows
that have tuple size larger than 4096 bytes, the scratch batch capacity
will be reduced to be lower 1024, i.e. the scratch batch can store less
than 1024 rows. In this case, we need more iterations in the loop.

The bug is that we didn't commit rows to the output row batch after each
iteration. The suvived rows will be ovewritten in the second iteration.

This is fixed in a later optimization (IMPALA-9469) which is missing in
the 3.x branch. This patch only pick the fix of it.

Tests:
 - Add test on wide tables with 2K columns

Change-Id: I09f1c23c817ad012587355c16f37f42d5fb41bff
Reviewed-on: http://gerrit.cloudera.org:8080/18745
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2022-07-19 11:16:11 +08:00
committed by Quanlong Huang
parent 1de16355ee
commit c4c6bd3986
2 changed files with 56 additions and 5 deletions

View File

@@ -732,6 +732,13 @@ Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
int num_tuples_transferred = TransferScratchTuples(dst_batch);
row_id += num_tuples_transferred;
num_to_commit += num_tuples_transferred;
// Commit rows before next iteration. otherwise, they will be overwritten.
// This happens when more than one iteration is needed, e.g. when the capacity of
// scratch_batch is small (due to reading wide rows).
VLOG_ROW << Substitute(
"Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
num_to_commit, dst_batch->num_rows());
RETURN_IF_ERROR(CommitRows(num_tuples_transferred, dst_batch));
} else {
if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
InitTuple(tuple_desc, template_tuple_, tuple);
@@ -746,9 +753,12 @@ Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
}
}
}
VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
num_to_commit, dst_batch->num_rows());
return CommitRows(num_to_commit, dst_batch);
if (!do_batch_read) {
VLOG_ROW << Substitute("Transfer $0 rows from scratch batch to dst_batch ($1 rows)",
num_to_commit, dst_batch->num_rows());
return CommitRows(num_to_commit, dst_batch);
}
return Status::OK();
}
Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {

View File

@@ -68,6 +68,11 @@ DEBUG_ACTION_DIMS = [None,
# Trigger injected soft limit failures when scanner threads check memory limit.
DEBUG_ACTION_DIMS.append('HDFS_SCANNER_THREAD_CHECK_SOFT_MEM_LIMIT:FAIL@0.5')
# Map from the test dimension file_format string to the SQL "STORED AS"
# argument.
STORED_AS_ARGS = {'text': 'textfile', 'parquet': 'parquet', 'avro': 'avro', 'orc': 'orc',
'seq': 'sequencefile', 'rc': 'rcfile'}
class TestScannersAllTableFormats(ImpalaTestSuite):
BATCH_SIZES = [0, 1, 16]
@@ -225,7 +230,6 @@ class TestUnmatchedSchema(ImpalaTestSuite):
self._drop_test_table(vector)
# Tests that scanners can read a single-column, single-row, 10MB table
class TestWideRow(ImpalaTestSuite):
@classmethod
def get_workload(cls):
@@ -240,7 +244,8 @@ class TestWideRow(ImpalaTestSuite):
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format != 'hbase')
def test_wide_row(self, vector):
def test_single_wide_row(self, vector):
"""Tests that scanners can read a single-column, single-row, 10MB table"""
if vector.get_value('table_format').file_format == 'kudu':
pytest.xfail("KUDU-666: Kudu support for large values")
@@ -256,6 +261,42 @@ class TestWideRow(ImpalaTestSuite):
new_vector.get_value('exec_option')['mem_limit'] = 100 * 1024 * 1024
self.run_test_case('QueryTest/wide-row', new_vector)
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfIsilon.hive
@SkipIfLocal.hive
@SkipIfS3.hive
def test_multi_wide_rows(self, vector, unique_database):
"""Tests that scanners can read multi rows of a wide table"""
if vector.get_value('table_format').file_format == 'kudu':
pytest.xfail("Kudu table can have a maximum of 300 columns")
format = STORED_AS_ARGS[vector.get_value('table_format').file_format]
create_tbl_stmt = 'create table %s.wide_tbl(col0 bigint' % unique_database
for i in range(1, 2000):
create_tbl_stmt += ',col%d bigint' % i
create_tbl_stmt += ') stored as %s' % format
self.client.execute(create_tbl_stmt)
insert_stmt = 'insert into %s.wide_tbl ' % unique_database +\
'select id' + (',id' * 1999) +\
' from functional.alltypes order by id limit 1000'
if format in ('textfile', 'parquet', 'kudu'):
self.client.execute(insert_stmt)
else:
self.run_stmt_in_hive(insert_stmt)
self.client.execute('refresh %s.wide_tbl' % unique_database)
result = self.client.execute(
"select * from %s.wide_tbl where col0 = 1" % unique_database)
assert len(result.data) == 1
assert type(result.data[0]) == str
cols = result.data[0].split('\t')
assert len(cols) == 2000
for i in range(2000):
assert cols[i] == '1'
class TestWideTable(ImpalaTestSuite):
# TODO: expand this to more rows when we have the capability
NUM_COLS = [250, 500, 1000]