From e121bc9b0a4374cbdbad85b99dbcc74eb6e224cc Mon Sep 17 00:00:00 2001 From: Juan Yu Date: Tue, 3 Mar 2015 07:13:48 -0800 Subject: [PATCH] IMPALA-1476: Impala incorrectly handles text data missing a newline on the last line. I did a local benchmark and there's minimal performance impact(<1%) Change-Id: I8d84a145acad886c52587258b27d33cff96ea399 (cherry picked from commit 7e750ad5d90007cc85ebe493af4dce7a537ad7c0) Reviewed-on: http://gerrit.cloudera.org:8080/189 Reviewed-by: Juan Yu Tested-by: Internal Jenkins --- be/src/exec/delimited-text-parser.cc | 9 +- be/src/exec/delimited-text-parser.h | 17 +++- be/src/exec/delimited-text-parser.inline.h | 10 +- be/src/exec/hdfs-text-scanner.cc | 10 +- be/src/exec/scanner-context.cc | 1 + testdata/common/text_delims_table.py | 42 ++++++++ testdata/data/table_no_newline.csv | 5 + .../functional/functional_schema_template.sql | 97 +++++++++++++++++++ .../functional/schema_constraints.csv | 9 ++ .../DataErrorsTest/hdfs-scan-node-errors.test | 11 ++- .../queries/QueryTest/hdfs-text-scan.test | 14 +++ tests/query_test/test_scanners.py | 45 ++++++++- 12 files changed, 255 insertions(+), 15 deletions(-) create mode 100755 testdata/common/text_delims_table.py create mode 100644 testdata/data/table_no_newline.csv create mode 100644 testdata/workloads/functional-query/queries/QueryTest/hdfs-text-scan.test diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc index 2eeca730e..e6e636ef0 100644 --- a/be/src/exec/delimited-text-parser.cc +++ b/be/src/exec/delimited-text-parser.cc @@ -35,7 +35,8 @@ DelimitedTextParser::DelimitedTextParser( num_cols_(num_cols), num_partition_keys_(num_partition_keys), is_materialized_col_(is_materialized_col), - column_idx_(0) { + column_idx_(0), + unfinished_tuple_(false){ // Escape character should not be the same as tuple or col delim unless it is the // empty delimiter. DCHECK(escape_char == '\0' || escape_char != tuple_delim); @@ -123,6 +124,7 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin while (remaining_len > 0) { bool new_tuple = false; bool new_col = false; + unfinished_tuple_ = true; if (!last_char_is_escape_) { if (tuple_delim_ != '\0' && (**byte_buffer_ptr == tuple_delim_ || @@ -154,6 +156,7 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin row_end_locations[*num_tuples] = *byte_buffer_ptr; ++(*num_tuples); } + unfinished_tuple_ = false; last_row_delim_offset_ = **byte_buffer_ptr == '\r' ? remaining_len - 1 : -1; if (*num_tuples == max_tuples) { ++*byte_buffer_ptr; @@ -179,6 +182,7 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin FillColumns(0, NULL, num_fields, field_locations); column_idx_ = num_partition_keys_; ++(*num_tuples); + unfinished_tuple_ = false; } return Status::OK; } @@ -247,6 +251,9 @@ restart: // unlikely. int num_escape_chars = 0; int before_tuple_end = tuple_start - 2; + // TODO: If scan range is split between escape character and tuple delimiter, + // before_tuple_end will be -1. Need to scan previous range for escape characters + // in this case. for (; before_tuple_end >= 0; --before_tuple_end) { if (buffer_start[before_tuple_end] == escape_char_) { ++num_escape_chars; diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h index 100285961..723c2767c 100644 --- a/be/src/exec/delimited-text-parser.h +++ b/be/src/exec/delimited-text-parser.h @@ -113,9 +113,13 @@ class DelimitedTextParser { // by the number fields added. // field_locations will be updated with the start and length of the fields. template - void FillColumns(int len, char** last_column, + void FillColumns(int len, char** last_column, int* num_fields, impala::FieldLocation* field_locations); + // Return true if we have not seen a tuple delimiter for the current tuple being + // parsed (i.e., the last byte read was not a tuple delimiter). + bool HasUnfinishedTuple() { return unfinished_tuple_; } + private: // Initialize the parser state. void ParserInit(HdfsScanNode* scan_node); @@ -134,8 +138,8 @@ class DelimitedTextParser { void AddColumn(int len, char** next_column_start, int* num_fields, FieldLocation* field_locations); - // Helper routine to parse delimited text using SSE instructions. - // Identical arguments as ParseFieldLocations. + // Helper routine to parse delimited text using SSE instructions. + // Identical arguments as ParseFieldLocations. // If the template argument, 'process_escapes' is true, this function will handle // escapes, otherwise, it will assume the text is unescaped. By using templates, // we can special case the un-escaped path for better performance. The unescaped @@ -180,7 +184,7 @@ class DelimitedTextParser { // Whether or not the previous character was the escape character bool last_char_is_escape_; - // Used for special processing of \r. + // Used for special processing of \r. // This will be the offset of the last instance of \r from the end of the // current buffer being searched unless the last row delimiter was not a \r in which // case it will be -1. If the last character in a buffer is \r then the value @@ -188,7 +192,7 @@ class DelimitedTextParser { // then it is set to be one more than the size of the buffer so that if the buffer // starts with \n it is processed as \r\n. int32_t last_row_delim_offset_; - + // Precomputed masks to process escape characters uint16_t low_mask_[16]; uint16_t high_mask_[16]; @@ -205,6 +209,9 @@ class DelimitedTextParser { // Index to keep track of the current column in the current file int column_idx_; + + // True if the last tuple is unfinished (not ended with tuple delimiter). + bool unfinished_tuple_; }; }// namespace impala diff --git a/be/src/exec/delimited-text-parser.inline.h b/be/src/exec/delimited-text-parser.inline.h index 8db468ec7..f9c910884 100644 --- a/be/src/exec/delimited-text-parser.inline.h +++ b/be/src/exec/delimited-text-parser.inline.h @@ -146,6 +146,11 @@ inline void DelimitedTextParser::ParseSse(int max_tuples, ProcessEscapeMask(escape_mask, &last_char_is_escape_, &delim_mask); } + char* last_char = *byte_buffer_ptr + 15; + bool last_char_is_unescaped_delim = delim_mask >> 15; + unfinished_tuple_ = !(last_char_is_unescaped_delim && + (*last_char == tuple_delim_ || (tuple_delim_ == '\n' && *last_char == '\r'))); + int last_col_idx = 0; // Process all non-zero bits in the delim_mask from lsb->msb. If a bit // is set, the character in that spot is either a field or tuple delimiter. @@ -220,7 +225,6 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b column_idx_ = num_partition_keys_; current_column_has_escape_ = false; - if (LIKELY(CpuInfo::IsSupported(CpuInfo::SSE4_2))) { while (LIKELY(remaining_len >= SSEUtil::CHARS_PER_128_BIT_REGISTER)) { // Load the next 16 bytes into the xmm register @@ -282,7 +286,7 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b last_char_is_escape_ = false; } - if (!last_char_is_escape_ && + if (!last_char_is_escape_ && (*buffer == field_delim_ || *buffer == collection_item_delim_)) { AddColumn(buffer - next_column_start, &next_column_start, num_fields, field_locations); @@ -291,7 +295,7 @@ inline void DelimitedTextParser::ParseSingleTuple(int64_t remaining_len, char* b --remaining_len; ++buffer; } - + // Last column does not have a delimiter after it. Add that column and also // pad with empty cols if the input is ragged. FillColumns(buffer - next_column_start, diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 490d88446..b775933de 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -259,7 +259,9 @@ Status HdfsTextScanner::FinishScanRange() { byte_buffer_read_size_ = 0; // If compressed text, then there is nothing more to be read. - if (decompressor_.get() == NULL) { + // TODO: calling FillByteBuffer() at eof() can cause + // ScannerContext::Stream::GetNextBuffer to DCHECK. Fix this. + if (decompressor_.get() == NULL && !stream_->eof()) { status = FillByteBuffer(&eosr, NEXT_BLOCK_READ_SIZE); } @@ -294,6 +296,12 @@ Status HdfsTextScanner::FinishScanRange() { DCHECK_GE(num_tuples, 0); COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples); RETURN_IF_ERROR(CommitRows(num_tuples)); + } else if (delimited_text_parser_->HasUnfinishedTuple() && + scan_node_->materialized_slots().empty()) { + // If no fields are materialized we do not update partial_tuple_empty_, + // boundary_column_, or boundary_row_. However, we still need to handle the case + // of partial tuple due to missing tuple delimiter at the end of file. + RETURN_IF_ERROR(CommitRows(1)); } break; } diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 919049ab0..027127b25 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -145,6 +145,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { DCHECK_GE(read_past_buffer_size, 0); if (read_past_buffer_size == 0) { io_buffer_bytes_left_ = 0; + // TODO: We are leaving io_buffer_ = NULL, revisit. return Status::OK; } DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange( diff --git a/testdata/common/text_delims_table.py b/testdata/common/text_delims_table.py new file mode 100755 index 000000000..20287b384 --- /dev/null +++ b/testdata/common/text_delims_table.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +# Copyright (c) 2015 Cloudera, Inc. All rights reserved. + +# Functions for generating test files with specific length, and ended with all +# permutation (with replacement) of items in suffix_list. + +from shutil import rmtree +from optparse import OptionParser +from contextlib import contextmanager +from itertools import product +import os + +parser = OptionParser() +parser.add_option("--table_dir", dest="table_dir", default=None) +parser.add_option("--only_newline", dest="only_newline", default=False, action="store_true") +parser.add_option("--file_len", dest="file_len", type="int") + +def generate_testescape_files(table_location, only_newline, file_len): + data = ''.join(["1234567890" for _ in xrange(1 + file_len / 10)]) + + suffix_list = ["\\", ",", "a"] + if only_newline: + suffix_list.append("\n") + else: + suffix_list.append("\r\n") + + if os.path.exists(table_location): + rmtree(table_location) + + os.mkdir(table_location) + for count, p in enumerate(product(suffix_list, repeat=len(suffix_list))): + ending = ''.join(p) + content = data[:file_len - len(ending)] + ending + with open(os.path.join(table_location, str(count)), 'w') as f: + f.write(content) + +if __name__ == "__main__": + (options, args) = parser.parse_args() + if not options.table_dir: + parser.error("--table_dir option must be specified") + + generate_testescape_files(options.table_dir, options.only_newline, options.file_len) diff --git a/testdata/data/table_no_newline.csv b/testdata/data/table_no_newline.csv new file mode 100644 index 000000000..110f59374 --- /dev/null +++ b/testdata/data/table_no_newline.csv @@ -0,0 +1,5 @@ +1,true,123.123,2012-10-24 08:55:00 +2,false,1243.5,2012-10-25 13:40:00 +3,false,24453.325,2008-08-22 09:33:21.123 +4,false,243423.325,2007-05-12 22:32:21.33454 +5,true,243.325,1953-04-22 09:11:33 \ No newline at end of file diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index 588629a11..9e13d240f 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -1470,3 +1470,100 @@ value DECIMAL(5,2) LOAD DATA LOCAL INPATH '${{env:IMPALA_HOME}}/testdata/data/avro_decimal_tbl.avro' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name}; ==== +---- DATASET +functional +---- BASE_TABLE_NAME +table_no_newline +---- CREATE +CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( +id INT, col_1 BOOLEAN, col_2 DOUBLE, col_3 TIMESTAMP) +row format delimited fields terminated by ',' +LOCATION '/test-warehouse/{table_name}'; +---- LOAD +`hadoop fs -mkdir -p /test-warehouse/table_no_newline && \ +hadoop fs -put -f ${IMPALA_HOME}/testdata/data/table_no_newline.csv /test-warehouse/table_no_newline +==== +---- DATASET +functional +---- BASE_TABLE_NAME +testescape_16_lf +---- CREATE +CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + col string) +row format delimited fields terminated by ',' escaped by '\\' +LOCATION '/test-warehouse/{table_name}'; +---- LOAD +`${IMPALA_HOME}/testdata/common/text_delims_table.py --table_dir '/tmp/testescape_16_lf' --file_len 16 --only_newline && \ +hadoop fs -mkdir -p /test-warehouse/testescape_16_lf && \ +hadoop fs -put -f /tmp/testescape_16_lf/* /test-warehouse/testescape_16_lf/ +==== +---- DATASET +functional +---- BASE_TABLE_NAME +testescape_16_crlf +---- CREATE +CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + col string) +row format delimited fields terminated by ',' escaped by '\\' +LOCATION '/test-warehouse/{table_name}'; +---- LOAD +`${IMPALA_HOME}/testdata/common/text_delims_table.py --table_dir '/tmp/testescape_16_crlf' --file_len 16 && \ +hadoop fs -mkdir -p /test-warehouse/testescape_16_crlf && \ +hadoop fs -put -f /tmp/testescape_16_crlf/* /test-warehouse/testescape_16_crlf/ +==== +---- DATASET +functional +---- BASE_TABLE_NAME +testescape_17_lf +---- CREATE +CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + col string) +row format delimited fields terminated by ',' escaped by '\\' +LOCATION '/test-warehouse/{table_name}'; +---- LOAD +`${IMPALA_HOME}/testdata/common/text_delims_table.py --table_dir '/tmp/testescape_17_lf' --file_len 17 --only_newline && \ +hadoop fs -mkdir -p /test-warehouse/testescape_17_lf && \ +hadoop fs -put -f /tmp/testescape_17_lf/* /test-warehouse/testescape_17_lf/ +==== +---- DATASET +functional +---- BASE_TABLE_NAME +testescape_17_crlf +---- CREATE +CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + col string) +row format delimited fields terminated by ',' escaped by '\\' +LOCATION '/test-warehouse/{table_name}'; +---- LOAD +`${IMPALA_HOME}/testdata/common/text_delims_table.py --table_dir '/tmp/testescape_17_crlf' --file_len 17 && \ +hadoop fs -mkdir -p /test-warehouse/testescape_17_crlf && \ +hadoop fs -put -f /tmp/testescape_17_crlf/* /test-warehouse/testescape_17_crlf/ +==== +---- DATASET +functional +---- BASE_TABLE_NAME +testescape_32_lf +---- CREATE +CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + col string) +row format delimited fields terminated by ',' escaped by '\\' +LOCATION '/test-warehouse/{table_name}'; +---- LOAD +`${IMPALA_HOME}/testdata/common/text_delims_table.py --table_dir '/tmp/testescape_32_lf' --file_len 32 --only_newline && \ +hadoop fs -mkdir -p /test-warehouse/testescape_32_lf && \ +hadoop fs -put -f /tmp/testescape_32_lf/* /test-warehouse/testescape_32_lf/ +==== +---- DATASET +functional +---- BASE_TABLE_NAME +testescape_32_crlf +---- CREATE +CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + col string) +row format delimited fields terminated by ',' escaped by '\\' +LOCATION '/test-warehouse/{table_name}'; +---- LOAD +`${IMPALA_HOME}/testdata/common/text_delims_table.py --table_dir '/tmp/testescape_32_crlf' --file_len 32 && \ +hadoop fs -mkdir -p /test-warehouse/testescape_32_crlf && \ +hadoop fs -put -f /tmp/testescape_32_crlf/* /test-warehouse/testescape_32_crlf/ +==== diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv index c5d694a99..0321bc404 100644 --- a/testdata/datasets/functional/schema_constraints.csv +++ b/testdata/datasets/functional/schema_constraints.csv @@ -119,3 +119,12 @@ table_name:invalid_decimal_part_tbl2, constraint:restrict_to, table_format:text/ table_name:invalid_decimal_part_tbl3, constraint:restrict_to, table_format:text/none/none table_name:avro_decimal_tbl, constraint:restrict_to, table_format:avro/snap/block + +# testescape tables are used for testing text scanner delimiter handling +table_name:table_no_newline, constraint:restrict_to, table_format:text/none/none +table_name:testescape_16_lf, constraint:restrict_to, table_format:text/none/none +table_name:testescape_16_crlf, constraint:restrict_to, table_format:text/none/none +table_name:testescape_17_lf, constraint:restrict_to, table_format:text/none/none +table_name:testescape_17_crlf, constraint:restrict_to, table_format:text/none/none +table_name:testescape_32_lf, constraint:restrict_to, table_format:text/none/none +table_name:testescape_32_crlf, constraint:restrict_to, table_format:text/none/none diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/hdfs-scan-node-errors.test b/testdata/workloads/functional-query/queries/DataErrorsTest/hdfs-scan-node-errors.test index 9ed8642ee..93b31d137 100644 --- a/testdata/workloads/functional-query/queries/DataErrorsTest/hdfs-scan-node-errors.test +++ b/testdata/workloads/functional-query/queries/DataErrorsTest/hdfs-scan-node-errors.test @@ -76,7 +76,16 @@ select count(*) from functional_text_lzo.bad_text_lzo ---- ERRORS Blocksize: 536870911 is greater than LZO_MAX_BLOCK_SIZE: 67108864 ---- RESULTS -5140 +5141 +---- TYPES +bigint +==== +---- QUERY +select count(field) from functional_text_lzo.bad_text_lzo +---- ERRORS +Blocksize: 536870911 is greater than LZO_MAX_BLOCK_SIZE: 67108864 +---- RESULTS +5141 ---- TYPES bigint ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs-text-scan.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs-text-scan.test new file mode 100644 index 000000000..1b1eaddec --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs-text-scan.test @@ -0,0 +1,14 @@ +---- QUERY +select count(*) from functional.table_no_newline +---- RESULTS +5 +---- TYPES +BIGINT +==== +---- QUERY +select count(col_3) from functional.table_no_newline +---- RESULTS +5 +---- TYPES +BIGINT +==== diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 10998ea60..8d93ac9ea 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -199,11 +199,13 @@ class TestParquet(ImpalaTestSuite): def test_parquet(self, vector): self.run_test_case('QueryTest/parquet', vector) -# We use very small scan ranges to exercise corner cases in the HDFS scanner more +# We use various scan ranges to exercise corner cases in the HDFS scanner more # thoroughly. In particular, it will exercise: -# 1. scan range with no tuple -# 2. tuple that span across multiple scan ranges -MAX_SCAN_RANGE_LENGTHS = [1, 2, 5] +# 1. default scan range +# 2. scan range with no tuple +# 3. tuple that span across multiple scan ranges +# 4. scan range length = 16 for ParseSse() execution path +MAX_SCAN_RANGE_LENGTHS = [0, 1, 2, 5, 16, 17, 32] class TestScanRangeLengths(ImpalaTestSuite): @classmethod @@ -221,6 +223,41 @@ class TestScanRangeLengths(ImpalaTestSuite): vector.get_value('max_scan_range_length') self.run_test_case('QueryTest/hdfs-tiny-scan', vector) +# More tests for text scanner +# 1. Test file that ends w/o tuple delimiter +# 2. Test file with escape character +class TestTextScanRangeLengths(ImpalaTestSuite): + ESCAPE_TABLE_LIST = ["testescape_16_lf", "testescape_16_crlf", + "testescape_17_lf", "testescape_17_crlf", + "testescape_32_lf", "testescape_32_crlf"] + + @classmethod + def get_workload(cls): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestTextScanRangeLengths, cls).add_test_dimensions() + cls.TestMatrix.add_dimension( + TestDimension('max_scan_range_length', *MAX_SCAN_RANGE_LENGTHS)) + cls.TestMatrix.add_constraint(lambda v:\ + v.get_value('table_format').file_format == 'text' and\ + v.get_value('table_format').compression_codec == 'none') + + def test_text_scanner(self, vector): + vector.get_value('exec_option')['max_scan_range_length'] =\ + vector.get_value('max_scan_range_length') + self.run_test_case('QueryTest/hdfs-text-scan', vector) + + # Test various escape char cases. We have to check the count(*) result against + # the count(col) result because if the scan range is split right after the escape + # char, the escape char has no effect because we cannot scan backwards to the + # previous scan range. + for t in self.ESCAPE_TABLE_LIST: + expected_result = self.client.execute("select count(col) from " + t) + result = self.client.execute("select count(*) from " + t) + assert result.data == expected_result.data + @pytest.mark.execute_serially class TestScanTruncatedFiles(ImpalaTestSuite): TEST_DB = 'test_truncated_file'