From cbadb4eac4e13b9b83e3a95c501b77aa9011f4cb Mon Sep 17 00:00:00 2001 From: Alan Choi Date: Thu, 12 Jul 2012 17:54:34 -0700 Subject: [PATCH] When a scan range begins at the starting point fo the tuple, we'll missed that tuple. This patch fixes this problem. review: 162 --- be/src/exec/delimited-text-parser.cc | 6 +- be/src/exec/delimited-text-parser.h | 1 + be/src/exec/hdfs-text-scanner.cc | 88 ++++++++++++------- be/src/exec/hdfs-text-scanner.h | 3 + be/src/runtime/runtime-state.cc | 7 +- be/src/runtime/runtime-state.h | 3 +- be/src/testutil/in-process-query-executor.cc | 14 ++- common/thrift/ImpalaInternalService.thrift | 3 + common/thrift/ImpalaService.thrift | 10 ++- .../impala/planner/HBaseScanNode.java | 5 +- .../cloudera/impala/planner/HdfsScanNode.java | 25 ++++-- .../com/cloudera/impala/planner/Planner.java | 41 ++++----- .../com/cloudera/impala/planner/ScanNode.java | 4 +- .../com/cloudera/impala/service/Executor.java | 29 +++++- .../com/cloudera/impala/service/Frontend.java | 2 +- .../cloudera/impala/planner/PlannerTest.java | 11 ++- .../impala/service/BaseQueryTest.java | 23 +++-- .../cloudera/impala/service/QueryTest.java | 29 ++++++ .../impala/testutil/TestExecContext.java | 23 ++++- .../cloudera/impala/testutil/TestUtils.java | 2 + testdata/TinyTable/data.csv | 3 + .../functional/functional_schema_template.sql | 19 ++++ .../queries/QueryTest/hdfs-tiny-scan.test | 16 ++++ testdata/workloads/tpch/queries/tpch-q1.test | 2 +- 24 files changed, 277 insertions(+), 92 deletions(-) create mode 100644 testdata/TinyTable/data.csv create mode 100644 testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc index 8661d92c0..8342f2c35 100644 --- a/be/src/exec/delimited-text-parser.cc +++ b/be/src/exec/delimited-text-parser.cc @@ -142,11 +142,10 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin // Find the first instance of the tuple delimiter. This will // find the start of the first full tuple in buffer by looking for the end of // the previous tuple. -// TODO: most of this is not tested. We need some tailored data to exercise the boundary -// cases int DelimitedTextParser::FindFirstInstance(char* buffer, int len) { int tuple_start = 0; char* buffer_start = buffer; + bool found = false; restart: if (CpuInfo::Instance()->IsSupported(CpuInfo::SSE4_2)) { __m128i xmm_buffer, xmm_tuple_mask; @@ -163,6 +162,7 @@ restart: _mm_cmpestrm(xmm_tuple_search_, 1, xmm_buffer, chr_count, SSEUtil::STRCHR_MODE); int tuple_mask = _mm_extract_epi16(xmm_tuple_mask, 0); if (tuple_mask != 0) { + found = true; for (int i = 0; i < SSEUtil::CHARS_PER_128_BIT_REGISTER; ++i) { if ((tuple_mask & SSEUtil::SSE_BITMASK[i]) != 0) { tuple_start += i + 1; @@ -180,6 +180,7 @@ restart: char c = *buffer++; if (c == tuple_delim_) { tuple_start = i + 1; + found = true; break; } } @@ -212,6 +213,7 @@ restart: } } + if (!found) return -1; return tuple_start; } diff --git a/be/src/exec/delimited-text-parser.h b/be/src/exec/delimited-text-parser.h index f5ab33a4f..b1f075120 100644 --- a/be/src/exec/delimited-text-parser.h +++ b/be/src/exec/delimited-text-parser.h @@ -65,6 +65,7 @@ class DelimitedTextParser { // delimiter from the starting offset. // Used to find the start of a tuple if jumping into the middle of a text file. // Also used to find the sync marker for Sequenced and RC files. + // If no tuple delimiter is found within the buffer, return -1; int FindFirstInstance(char* buffer, int len); // Find a sync block if jumping into the middle of a Sequence or RC file. diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 418e1952e..127b7d3ec 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -84,11 +84,33 @@ Status HdfsTextScanner::InitCurrentScanRange(HdfsPartitionDescriptor* hdfs_parti boundary_row_.Clear(); delimited_text_parser_->ParserReset(); + eosr_ = false; + // Offset may not point to tuple boundary if (scan_range->offset_ != 0) { COUNTER_SCOPED_TIMER(parse_delimiter_timer_); int first_tuple_offset = delimited_text_parser_->FindFirstInstance( byte_buffer_, byte_buffer_read_size_); + // Find the first tuple offset. We might have to read through a few buffers before + // we can find them. + while (first_tuple_offset < 0) { + // If we can't find the first tuple delimiter before reading the end of our scan + // range (or end of file), we're done. The bytes we've skipped are processed by + // some earlier scan range (not necessarily the previous one). + current_range_remaining_len_ -= byte_buffer_read_size_; + if (current_range_remaining_len_ <= 0) { + eosr_ = true; + return Status::OK; + } + RETURN_IF_ERROR(FillByteBuffer(current_range_remaining_len_)); + if (byte_buffer_read_size_ == 0) { + eosr_ = true; + return Status::OK; + } + first_tuple_offset = delimited_text_parser_->FindFirstInstance( + byte_buffer_, byte_buffer_read_size_); + } + DCHECK_GE(first_tuple_offset, 0); DCHECK_LE(first_tuple_offset, min(state_->file_buffer_size(), current_range_remaining_len_)); byte_buffer_ptr_ += first_tuple_offset; @@ -164,30 +186,46 @@ Status HdfsTextScanner::GetNext(RowBatch* row_batch, bool* eosr) { // The only case where this will be false if the row batch fills up. *eosr = true; + // If we've reached EOS during the init scan range without finding a tuple delimiter, + // we're done. + if (eosr_) return Status::OK; + // This loop contains a small state machine: // 1. byte_buffer_ptr_ != byte_buffer_end_: no need to read more, process what's in // the read buffer. // 2. current_range_remaining_len_ > 0: keep reading from the current file location - // 3. current_range_remaining_len_ == 0: done with current range but might need more - // to complete the last tuple. - // 4. current_range_remaining_len_ < 0: Reading past this scan range to finish tuple + // 3. current_range_remaining_len_ <= 0: Reading past this scan range to finish tuple while (true) { if (byte_buffer_ptr_ == byte_buffer_end_) { - if (current_range_remaining_len_ < 0) { + if (current_range_remaining_len_ <= 0) { + // We are at the end of the scan range and need to check if we need to read past + // the scan range to finish an incomplete tuple. There are two cases when this + // can occur: + // 1. We are in the middle of a row and we've seen at least one column. The + // text parser stores this state. + // 2. we are in the middle of the first column. We can identify this case if + // there are bytes in the boundary column buffer or the last parsed column + // start location is still in this scan range. + bool incomplete_tuple = !boundary_column_.Empty() || + !delimited_text_parser_->AtTupleStart() || + (col_start != NULL && col_start != byte_buffer_ptr_); + // We want to minimize how much we read next. It is past the end of this block // and likely on another node. // TODO: Set it to be the average size of a row RETURN_IF_ERROR(FillByteBuffer(NEXT_BLOCK_READ_SIZE)); if (byte_buffer_read_size_ == 0) { - // There was an incomplete tuple at the end of the file. The file - // is corrupt. - if (state_->LogHasSpace()) { - state_->error_stream() << "Incomplete tuple at end of file: " - << current_byte_stream_->GetLocation() << endl; - state_->LogErrorStream(); + if (incomplete_tuple) { + // There was an incomplete tuple at the end of the file. The file + // is corrupt. + if (state_->LogHasSpace()) { + state_->error_stream() << "Incomplete tuple at end of file: " + << current_byte_stream_->GetLocation() << endl; + state_->LogErrorStream(); + } + LOG(ERROR) << "Incomplete tuple at end of file: " + << current_byte_stream_->GetLocation(); } - LOG(ERROR) << "Incomplete tuple at end of file: " - << current_byte_stream_->GetLocation(); current_range_remaining_len_ = 0; slot_idx_ = 0; delimited_text_parser_->ParserReset(); @@ -195,23 +233,6 @@ Status HdfsTextScanner::GetNext(RowBatch* row_batch, bool* eosr) { byte_buffer_ptr_ = NULL; break; } - } else if (current_range_remaining_len_ == 0) { - // Check if a tuple is straddling this block and the next: - // 1. boundary_column_ is not empty - // 2. if we are halfway through reading a tuple: !AtStart. - // 3. We have are in the middle of the first column - // We need to continue scanning until the end of the tuple. Note that - // boundary_column_ will be empty if we are on a column boundary, - // but could still be inside a tuple. Similarly column_idx_ could be - // first_materialised_col_idx if we are in the middle of reading the first - // column. Therefore we need both checks. - // TODO: test that hits this condition. - if (!boundary_column_.Empty() || !delimited_text_parser_->AtTupleStart() || - (col_start != NULL && col_start != byte_buffer_ptr_)) { - current_range_remaining_len_ = -1; - continue; - } - break; } else { // Continue reading from the current file location DCHECK_GE(current_range_remaining_len_, 0); @@ -234,7 +255,7 @@ Status HdfsTextScanner::GetNext(RowBatch* row_batch, bool* eosr) { int max_tuples = row_batch->capacity() - row_batch->num_rows(); // We are done with the current scan range, just parse for one more tuple to finish // it off - if (current_range_remaining_len_ < 0) { + if (current_range_remaining_len_ <= 0) { max_tuples = 1; } @@ -252,7 +273,8 @@ Status HdfsTextScanner::GetNext(RowBatch* row_batch, bool* eosr) { DCHECK_GT(bytes_processed, 0); int num_tuples_materialized = 0; - if (scan_node_->materialized_slots().size() != 0) { + if (scan_node_->materialized_slots().size() != 0 && + (num_fields > 0 || num_tuples > 0)) { // There can be one partial tuple which returned no more fields from this buffer. DCHECK_LE(num_tuples, num_fields + 1); if (!boundary_column_.Empty()) { @@ -299,14 +321,14 @@ Status HdfsTextScanner::GetNext(RowBatch* row_batch, bool* eosr) { if (current_range_remaining_len_ < 0) { DCHECK_LE(num_tuples, 1); - // Just finished off this scan range if (num_tuples == 1) { + // Just finished off this scan range DCHECK(boundary_column_.Empty()); DCHECK_EQ(slot_idx_, 0); current_range_remaining_len_ = 0; byte_buffer_ptr_ = byte_buffer_end_; + break; } - break; } // The row batch is full. We'll pick up where we left off in the next diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index 0b32be353..cb1fd24fa 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -190,6 +190,9 @@ class HdfsTextScanner : public HdfsScanner { // range. When <= 0, GetNext will prepare to exit. int current_range_remaining_len_; + // True if we've reached the end of scan range during init. + bool eosr_; + // Matching typedef for WriteAlignedTuples for codegen. Refer to comments for // that function. typedef int (*WriteTuplesFn)(HdfsTextScanner*, RowBatch*, FieldLocation*, int, int, diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index df0a31838..a5f7a59d2 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -30,7 +30,6 @@ RuntimeState::RuntimeState( const TUniqueId& fragment_id, const TQueryOptions& query_options, const string& now, ExecEnv* exec_env) : obj_pool_(new ObjectPool()), - file_buffer_size_(DEFAULT_FILE_BUFFER_SIZE), profile_(obj_pool_.get(), "RuntimeState"), is_cancelled_(false) { Status status = Init(fragment_id, query_options, now, exec_env); @@ -39,9 +38,9 @@ RuntimeState::RuntimeState( RuntimeState::RuntimeState() : obj_pool_(new ObjectPool()), - file_buffer_size_(DEFAULT_FILE_BUFFER_SIZE), profile_(obj_pool_.get(), "RuntimeState") { query_options_.batch_size = DEFAULT_BATCH_SIZE; + query_options_.file_buffer_size = DEFAULT_FILE_BUFFER_SIZE; } RuntimeState::~RuntimeState() { @@ -65,7 +64,9 @@ Status RuntimeState::Init( if (query_options_.batch_size <= 0) { query_options_.batch_size = DEFAULT_BATCH_SIZE; } - + if (query_options.file_buffer_size <= 0) { + query_options_.file_buffer_size = DEFAULT_FILE_BUFFER_SIZE; + } return Status::OK; } diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index eeaac5d50..9c2317959 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -47,7 +47,7 @@ class RuntimeState { const DescriptorTbl& desc_tbl() const { return *desc_tbl_; } void set_desc_tbl(DescriptorTbl* desc_tbl) { desc_tbl_ = desc_tbl; } int batch_size() const { return query_options_.batch_size; } - int file_buffer_size() const { return file_buffer_size_; } + int file_buffer_size() const { return query_options_.file_buffer_size; } bool abort_on_error() const { return query_options_.abort_on_error; } int max_errors() const { return query_options_.max_errors; } const TimestampValue* now() const { return now_.get(); } @@ -102,7 +102,6 @@ class RuntimeState { DescriptorTbl* desc_tbl_; boost::scoped_ptr obj_pool_; - int file_buffer_size_; // A buffer for error messages. // The entire error stream is written to the error_log_ in log_error_stream(). diff --git a/be/src/testutil/in-process-query-executor.cc b/be/src/testutil/in-process-query-executor.cc index 6b8996fc7..71358d5ac 100644 --- a/be/src/testutil/in-process-query-executor.cc +++ b/be/src/testutil/in-process-query-executor.cc @@ -43,6 +43,12 @@ DEFINE_int32(batch_size, 0, "batch size to be used by backend; a batch size of 0 indicates the " "backend's default batch size"); +DEFINE_int32(file_buffer_size, 0, + "file buffer size used by text parsing; size of 0 indicates the " + "backend's default file buffer size"); +DEFINE_int32(max_scan_range_length, 0, + "maximum length of the scan range; only applicable to HDFS scan range; a length of 0" + " indicates backend default"); DEFINE_bool(abort_on_error, false, "if true, abort query when encountering any error"); DEFINE_int32(max_errors, 100, "number of errors to report"); DEFINE_int32(num_nodes, 1, @@ -97,7 +103,8 @@ Status InProcessQueryExecutor::Setup() { return Status::OK; } -Status InProcessQueryExecutor::Exec(const string& query, vector* col_types) { +Status InProcessQueryExecutor::Exec(const string& query, + vector* col_types) { query_profile_.reset(new RuntimeProfile(obj_pool_.get(), "InProcessQueryExecutor")); RuntimeProfile::Counter* plan_gen_counter = ADD_COUNTER(query_profile_, "PlanGeneration", TCounterType::CPU_TICKS); @@ -112,6 +119,8 @@ Status InProcessQueryExecutor::Exec(const string& query, vector* query_options.disable_codegen = !FLAGS_enable_jit; query_options.max_errors = FLAGS_max_errors; query_options.num_nodes = FLAGS_num_nodes; + query_options.file_buffer_size = FLAGS_file_buffer_size; + query_options.max_scan_range_length = FLAGS_max_scan_range_length; try { COUNTER_SCOPED_TIMER(plan_gen_counter); @@ -310,6 +319,9 @@ Status InProcessQueryExecutor::Explain(const string& query, string* explain_plan query_options.disable_codegen = !FLAGS_enable_jit; query_options.max_errors = FLAGS_max_errors; query_options.num_nodes = FLAGS_num_nodes; + query_options.file_buffer_size = FLAGS_file_buffer_size; + query_options.max_scan_range_length = FLAGS_max_scan_range_length; + TQueryRequest query_request; query_request.__set_stmt(query.c_str()); query_request.__set_queryOptions(query_options); diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 82f328920..e117c0142 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -31,6 +31,9 @@ struct TQueryOptions { 5: required bool return_as_ascii = 1 6: required i32 num_nodes = JavaConstants.NUM_NODES_ALL + + 7: required i64 max_scan_range_length = 0 + 8: required i32 file_buffer_size = 0 } // Parameters for the execution of a plan fragment on a particular node. diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 8c36d9ab6..709938dee 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -32,7 +32,15 @@ enum ImpalaQueryOptions { // more nodes than numNodes with plan fragments for this query, but at most // numNodes would be active at any point in time) // Constants (NUM_NODES_ALL, NUM_NODES_ALL_RACKS) are defined in JavaConstants.thrift. - NUM_NODES + NUM_NODES, + + // maximum length of the scan range; only applicable to HDFS scan range; Unspecified or + // a length of 0 indicates backend default; + MAX_SCAN_RANGE_LENGTH, + + // file buffer size used by text parsing; size of 0 indicates the backend's default + // file buffer size + FILE_BUFFER_SIZE } // For all rpc that return a TStatus as part of their result type, diff --git a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java index c676d5451..5700ee7c7 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java @@ -224,9 +224,10 @@ public class HBaseScanNode extends ScanNode { } @Override - public void getScanParams( + public void getScanParams(long maxScanRangeLength, int numPartitions, List scanRanges, List hostPorts) { - // No usage of NUM_NODES_ALL_RACKS yet. The condition on numPartition depends on this check. + // No usage of NUM_NODES_ALL_RACKS yet. The condition on numPartition depends on this + // check. Preconditions.checkState(numPartitions != Constants.NUM_NODES_ALL_RACKS); // Retrieve relevant HBase regions and their region servers diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java index fc2bbc7a0..0a97b670e 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java @@ -191,7 +191,7 @@ public class HdfsScanNode extends ScanNode { } @Override - public void getScanParams( + public void getScanParams(long maxScanRangeLength, int numNodes, List scanRanges, List hostPorts) { Preconditions.checkState(numNodes != Constants.NUM_NODES_ALL_RACKS); @@ -209,13 +209,22 @@ public class HdfsScanNode extends ScanNode { TScanRange scanRange = new TScanRange(id); for (HdfsTable.BlockMetadata metadata: blockAssignment.blockMetadata) { BlockLocation blockLocation = metadata.getLocation(); - THdfsFileSplit fileSplit = - new THdfsFileSplit(metadata.fileName, - blockLocation.getOffset(), - blockLocation.getLength(), - metadata.getPartition().getId()); - - scanRange.addToHdfsFileSplits(fileSplit); + long currentOffset = blockLocation.getOffset(); + long remainingLength = blockLocation.getLength(); + while (remainingLength > 0) { + long currentLength = remainingLength; + if (maxScanRangeLength > 0 && remainingLength > maxScanRangeLength) { + currentLength = maxScanRangeLength; + } + THdfsFileSplit fileSplit = + new THdfsFileSplit(metadata.fileName, + currentOffset, + currentLength, + metadata.getPartition().getId()); + scanRange.addToHdfsFileSplits(fileSplit); + remainingLength -= currentLength; + currentOffset += currentLength; + } } scanRanges.add(scanRange); if (hostPorts != null) { diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java index 4efa02967..94f60b0a2 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/Planner.java @@ -683,27 +683,25 @@ public class Planner { } /** - * Given an analysisResult, creates a sequence of plan fragments that implement the query. + * Given an analysisResult, creates a sequence of plan fragments that implement the + * query. * * @param analysisResult * result of query analysis - * @param numNodes - * number of nodes on which to execute fragments; same semantics as - * TQueryRequest.numNodes; - * allowed values: - * 1: single-node execution - * NUM_NODES_ALL: executes on all nodes that contain relevant data - * NUM_NODES_ALL_RACKS: executes on one node per rack that holds relevant data - * > 1: executes on at most that many nodes at any point in time (ie, there - * can be more nodes than numNodes with plan fragments for this query, but - * at most numNodes would be active at any point in time) + * @param queryOptions + * user specified query options; only num_nodes and max_scan_range_length are + * used. * @param explainString output parameter of the explain plan string, if not null * @return query exec request containing plan fragments and all execution parameters */ public TQueryExecRequest createPlanFragments( - AnalysisContext.AnalysisResult analysisResult, int numNodes, + AnalysisContext.AnalysisResult analysisResult, TQueryOptions queryOptions, StringBuilder explainString) throws NotImplementedException, InternalException { + // Only num_nodes and max_scan_range_length are used + int numNodes = queryOptions.num_nodes; + long maxScanRangeLength = queryOptions.max_scan_range_length; + // Set queryStmt from analyzed SELECT or INSERT query. QueryStmt queryStmt = null; if (analysisResult.isInsertStmt()) { @@ -782,10 +780,11 @@ public class Planner { List scanRanges = Lists.newArrayList(); List dataLocations = Lists.newArrayList(); if (numNodes == 1) { - createPartitionParams(root, 1, scanRanges, dataLocations); + createPartitionParams(root, 1, maxScanRangeLength, scanRanges, dataLocations); root.collectSubclasses(ScanNode.class, scans); } else { - createPartitionParams(slave, numNodes, scanRanges, dataLocations); + createPartitionParams(slave, numNodes, maxScanRangeLength, scanRanges, + dataLocations); slave.collectSubclasses(ScanNode.class, scans); ExchangeNode exchangeNode = root.findFirstOf(ExchangeNode.class); exchangeNode.setNumSenders(dataLocations.size()); @@ -832,7 +831,7 @@ public class Planner { // and doesn't send the output anywhere) request.addToNode_request_params(Lists.newArrayList(new TPlanExecParams())); } - createExecParams(request, scans, scanRanges, dataLocations); + createExecParams(request, scans, maxScanRangeLength, scanRanges, dataLocations); // Build the explain plan string, if requested if (explainString != null) { @@ -1014,9 +1013,10 @@ public class Planner { } /** - * Compute partitioning parameters (scan ranges and host / ports) for leftmost scan of plan. + * Compute partitioning parameters (scan ranges and host / ports) for leftmost scan of + * plan. */ - private void createPartitionParams(PlanNode plan, int numNodes, + private void createPartitionParams(PlanNode plan, int numNodes, long maxScanRangeLength, List scanRanges, List dataLocations) { ScanNode leftmostScan = getLeftmostScan(plan); if (leftmostScan == null) { @@ -1034,7 +1034,8 @@ public class Planner { } else { numPartitions = 1; } - leftmostScan.getScanParams(numPartitions, scanRanges, dataLocations); + leftmostScan.getScanParams(maxScanRangeLength, numPartitions, scanRanges, + dataLocations); if (scanRanges.isEmpty() && dataLocations.isEmpty()) { // if we're scanning an empty table we still need a single // host to execute the scan @@ -1062,7 +1063,7 @@ public class Planner { * scanRanges and data locations. */ private void createExecParams( - TQueryExecRequest request, ArrayList scans, + TQueryExecRequest request, ArrayList scans, long maxScanRangeLength, List scanRanges, List dataLocations) { // one TPlanExecParams per fragment/scan range; // we need to add an "empty" range for empty tables (in which case @@ -1086,7 +1087,7 @@ public class Planner { for (int i = 1; i < scans.size(); ++i) { ScanNode scan = scans.get(i); scanRanges = Lists.newArrayList(); - scan.getScanParams(1, scanRanges, null); + scan.getScanParams(maxScanRangeLength, 1, scanRanges, null); Preconditions.checkState(scanRanges.size() <= 1); if (!scanRanges.isEmpty()) { for (TPlanExecParams fragmentParams: fragmentParamsList) { diff --git a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java index 6fc25b581..f87c64fef 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java @@ -38,6 +38,8 @@ abstract public class ScanNode extends PlanNode { * Returns one TScanRange per partition of the scan. If 'hostports' is non-null and * there are multiple partitions, also returns locations on which scan ranges are * located, one per range. + * @param maxScanRangeLength the maximum number of bytes each scan range should scan; + * only applicable to HDFS; less than or equal to zero means no maximum * @param numPartitions number of scan partitions; same semantics as * TQueryRequest.numNodes; must be >= 1 or one of these special values: * NUM_NODES_ALL: as many partitions as there are nodes that contain relevant data @@ -46,7 +48,7 @@ abstract public class ScanNode extends PlanNode { * @param scanRanges output parameter * @param dataLocations output parameter */ - abstract public void getScanParams( + abstract public void getScanParams(long maxScanRangeLength, int numPartitions, List scanRanges, List dataLocations); @Override diff --git a/fe/src/main/java/com/cloudera/impala/service/Executor.java b/fe/src/main/java/com/cloudera/impala/service/Executor.java index 7192dada4..2673421b3 100644 --- a/fe/src/main/java/com/cloudera/impala/service/Executor.java +++ b/fe/src/main/java/com/cloudera/impala/service/Executor.java @@ -29,9 +29,9 @@ import com.cloudera.impala.analysis.Expr; import com.cloudera.impala.analysis.InsertStmt; import com.cloudera.impala.analysis.QueryStmt; import com.cloudera.impala.catalog.Catalog; +import com.cloudera.impala.catalog.HdfsStorageDescriptor.InvalidStorageDescriptorException; import com.cloudera.impala.catalog.HdfsTable; import com.cloudera.impala.catalog.PrimitiveType; -import com.cloudera.impala.catalog.HdfsStorageDescriptor.InvalidStorageDescriptorException; import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.planner.Planner; import com.cloudera.impala.thrift.TColumnValue; @@ -88,6 +88,16 @@ public class Executor { List errorLog, Map fileErrors, BlockingQueue resultQueue, InsertResult insertResult) throws ImpalaException { + runQuery(request, colTypes, colLabels, containsOrderBy, 0, batchSize, abortOnError, + maxErrors, disableCodegen, errorLog, fileErrors, resultQueue, insertResult); + } + + public void runQuery(TQueryRequest request, List colTypes, + List colLabels, AtomicBoolean containsOrderBy, int file_buffer_size, + int batchSize, boolean abortOnError, int maxErrors, boolean disableCodegen, + List errorLog, Map fileErrors, + BlockingQueue resultQueue, + InsertResult insertResult) throws ImpalaException { init(); LOG.info("query: " + request.stmt); AnalysisContext.AnalysisResult analysisResult = @@ -96,7 +106,9 @@ public class Executor { containsOrderBy.set(analysisResult.isQueryStmt() && analysisResult.getQueryStmt().hasOrderByClause()); } - execQuery(analysisResult, request.queryOptions.num_nodes, batchSize, abortOnError, + execQuery(analysisResult, request.queryOptions.num_nodes, + request.queryOptions.max_scan_range_length, + file_buffer_size, batchSize, abortOnError, maxErrors, disableCodegen, errorLog, fileErrors, request.queryOptions.return_as_ascii, resultQueue, insertResult); addSentinelRow(resultQueue); @@ -127,7 +139,10 @@ public class Executor { Runnable execCall = new Runnable() { public void run() { try { - execQuery(analysisResult, request.queryOptions.num_nodes, batchSize, + execQuery(analysisResult, request.queryOptions.num_nodes, + request.queryOptions.max_scan_range_length, + request.queryOptions.file_buffer_size, + batchSize, abortOnError, maxErrors, disableCodegen, errorLog, fileErrors, request.queryOptions.return_as_ascii, resultQueue, insertResult); } catch (ImpalaException e) { @@ -195,6 +210,7 @@ public class Executor { // strings. private void execQuery( AnalysisContext.AnalysisResult analysisResult, int numNodes, + long maxScanRangeLength, int file_buffer_size, int batchSize, boolean abortOnError, int maxErrors, boolean disableCodegen, List errorLog, Map fileErrors, boolean returnAsAscii, BlockingQueue resultQueue, InsertResult insertResult) @@ -202,9 +218,13 @@ public class Executor { // create plan fragments Planner planner = new Planner(); StringBuilder explainString = new StringBuilder(); + TQueryOptions options = new TQueryOptions(); + options.setNum_nodes(numNodes); + options.setMax_scan_range_length(maxScanRangeLength); + // for now, only single-node execution TQueryExecRequest execRequest = planner.createPlanFragments( - analysisResult, numNodes, explainString); + analysisResult, options, explainString); // Log explain string. LOG.info(explainString.toString()); @@ -235,6 +255,7 @@ public class Executor { planExecRequest.getQuery_options().setMax_errors(maxErrors); planExecRequest.getQuery_options().setDisable_codegen(disableCodegen); planExecRequest.getQuery_options().setBatch_size(batchSize); + planExecRequest.getQuery_options().setFile_buffer_size(file_buffer_size); } // set dest_fragment_ids of non-coord fragment diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java index facbb4066..6c714b5ad 100644 --- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java +++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java @@ -120,7 +120,7 @@ public class Frontend { Planner planner = new Planner(); TCreateQueryExecRequestResult result = new TCreateQueryExecRequestResult(); result.setQueryExecRequest( - planner.createPlanFragments(analysisResult, request.queryOptions.num_nodes, + planner.createPlanFragments(analysisResult, request.queryOptions, explainString)); result.queryExecRequest.sql_stmt = request.stmt; diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java index 357fc3645..edbefd8dc 100644 --- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java +++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java @@ -25,6 +25,7 @@ import com.cloudera.impala.testutil.TestFileParser.Section; import com.cloudera.impala.testutil.TestFileParser.TestCase; import com.cloudera.impala.testutil.TestUtils; import com.cloudera.impala.thrift.Constants; +import com.cloudera.impala.thrift.TQueryOptions; import com.google.common.collect.Lists; public class PlannerTest { @@ -58,7 +59,10 @@ public class PlannerTest { Planner planner = new Planner(); planner.setExplainPlanDetailLevel(level); explainStringBuilder.setLength(0); - planner.createPlanFragments(analysisResult, numNodes, explainStringBuilder); + TQueryOptions options = new TQueryOptions(); + options.setNum_nodes(numNodes); + planner.createPlanFragments(analysisResult, options, explainStringBuilder); + String explainStr = explainStringBuilder.toString(); actualOutput.append(explainStr); @@ -88,8 +92,9 @@ public class PlannerTest { AnalysisContext.AnalysisResult analysisResult = analysisCtxt.analyze(query); Planner planner = new Planner(); explainStringBuilder.setLength(0); - - planner.createPlanFragments(analysisResult, numNodes, explainStringBuilder); + TQueryOptions options = new TQueryOptions(); + options.setNum_nodes(numNodes); + planner.createPlanFragments(analysisResult, options, explainStringBuilder); errorLog.append( "query produced a plan\nquery=" + query + "\nplan=\n" diff --git a/fe/src/test/java/com/cloudera/impala/service/BaseQueryTest.java b/fe/src/test/java/com/cloudera/impala/service/BaseQueryTest.java index 3c129eaf8..06f5462e2 100644 --- a/fe/src/test/java/com/cloudera/impala/service/BaseQueryTest.java +++ b/fe/src/test/java/com/cloudera/impala/service/BaseQueryTest.java @@ -139,14 +139,20 @@ public abstract class BaseQueryTest { public String getTableSuffix() { return tableSuffix; } } - static private class TestConfiguration { + static protected class TestConfiguration { private final CompressionFormat compressionFormat; private final TableFormat tableFormat; private final TestExecContext execContext; public TestConfiguration(int nodes, int batchSize, CompressionFormat compression, TableFormat tableFormat, boolean disableLlvm) { - this.execContext = new TestExecContext(nodes, batchSize, disableLlvm, false, 1000); + this(new TestExecContext(nodes, batchSize, disableLlvm, false, 1000), compression, + tableFormat); + } + + public TestConfiguration(TestExecContext execContext, CompressionFormat compression, + TableFormat tableFormat) { + this.execContext = execContext; this.compressionFormat = compression; this.tableFormat = tableFormat; } @@ -155,6 +161,7 @@ public abstract class BaseQueryTest { return tableFormat.getTableSuffix() + compressionFormat.getTableSuffix(); } + public TestExecContext getTestExecContext() { return execContext; }; public CompressionFormat getCompressionFormat() { return compressionFormat; } public TableFormat getTableFormat() { return tableFormat; } public int getClusterSize() { return execContext.getNumNodes(); } @@ -428,7 +435,7 @@ public abstract class BaseQueryTest { return true; } - private void runQueryWithTestConfigs(List testConfigs, + protected void runQueryWithTestConfigs(List testConfigs, String testFile, boolean abortOnError, int maxErrors) { String fileName = new File(testDirName, testFile + ".test").getPath(); TestFileParser queryFileParser = new TestFileParser(fileName); @@ -438,10 +445,7 @@ public abstract class BaseQueryTest { for (TestConfiguration config: testConfigs) { queryFileParser.parseFile(config.getTableSuffix()); - TestExecContext context = new TestExecContext( - config.getClusterSize(), config.getBatchSize(), config.getDisableLlvm(), - abortOnError, maxErrors); - runOneQueryTest(queryFileParser, config, context, new StringBuilder()); + runOneQueryTest(queryFileParser, config, new StringBuilder()); // Don't need to (or want to) run multiple test configurations if we are generating // new results. @@ -483,7 +487,7 @@ public abstract class BaseQueryTest { * Run a single query test file as specified in the queryFileParser. */ private void runOneQueryTest(TestFileParser queryFileParser, TestConfiguration config, - TestExecContext context, StringBuilder errorLog) { + StringBuilder errorLog) { List results = Lists.newArrayList(); for (TestCase testCase: queryFileParser.getTestCases()) { @@ -506,7 +510,8 @@ public abstract class BaseQueryTest { Section.QUERY, false, " ", config.getTableSuffix()); QueryExecTestResult result = TestUtils.runQueryUsingExecutor(getTargetExecutor(), - queryString, context, testCase.getStartingLineNum(), expectedResult, errorLog); + queryString, config.getTestExecContext(), testCase.getStartingLineNum(), + expectedResult, errorLog); if(GENERATE_NEW_TEST_RESULTS) { result.getQuery().addAll(expectedResult.getQuery()); diff --git a/fe/src/test/java/com/cloudera/impala/service/QueryTest.java b/fe/src/test/java/com/cloudera/impala/service/QueryTest.java index 93fd09264..bb9d6f92a 100644 --- a/fe/src/test/java/com/cloudera/impala/service/QueryTest.java +++ b/fe/src/test/java/com/cloudera/impala/service/QueryTest.java @@ -2,11 +2,16 @@ package com.cloudera.impala.service; +import java.util.List; + import org.junit.Test; +import com.cloudera.impala.testutil.TestExecContext; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; public class QueryTest extends BaseQueryTest { + @Test public void TestDistinct() { runTestInExecutionMode(EXECUTION_MODE, "distinct", false, 1000); @@ -73,4 +78,28 @@ public class QueryTest extends BaseQueryTest { public void TestMixedFormat() { runTestInExecutionMode(EXECUTION_MODE, "mixed-format", false, 1000); } + + @Test + public void TestHdfsTinyScan() { + // We use very small scan ranges to exercise corner cases in the HDFS scanner more + // thoroughly. In particular, it will exercise: + // 1. scan range with no tuple + // 2. tuple that span across multiple scan ranges + TestExecContext execContext1 = + new TestExecContext(2, 1, true, false, 1000, 1, 0); + + // We use a very small file buffer to test the HDFS scanner init code that seeks the + // first tuple delimiter. + TestExecContext execContext2 = + new TestExecContext(2, 1, true, false, 1000, 5, 1); + + List testConfigs = Lists.newArrayList(); + testConfigs.add( + new TestConfiguration(execContext1, CompressionFormat.NONE, TableFormat.TEXT)); + testConfigs.add( + new TestConfiguration(execContext2, CompressionFormat.NONE, TableFormat.TEXT)); + + runQueryWithTestConfigs(testConfigs, "hdfs-tiny-scan", false, 1000); + } + } diff --git a/fe/src/test/java/com/cloudera/impala/testutil/TestExecContext.java b/fe/src/test/java/com/cloudera/impala/testutil/TestExecContext.java index 190b54dc1..be9814d39 100644 --- a/fe/src/test/java/com/cloudera/impala/testutil/TestExecContext.java +++ b/fe/src/test/java/com/cloudera/impala/testutil/TestExecContext.java @@ -6,6 +6,7 @@ import com.google.common.base.Objects; /* * Describes execution details for a query. + * TODO: replace it with TQueryOptions */ public class TestExecContext { private final boolean abortOnError; @@ -13,14 +14,24 @@ public class TestExecContext { private final boolean disableCodegen; private final int maxErrors; private final int numNodes; + private final long maxScanRangeLength; + private final int fileBufferSize; public TestExecContext(int numNodes, int batchSize, boolean disableCodegen, - boolean abortOnError, int maxErrors) { + boolean abortOnError, int maxErrors, long maxScanRangeLength, + int fileBufferSize) { this.numNodes = numNodes; this.batchSize = batchSize; this.disableCodegen = disableCodegen; this.abortOnError = abortOnError; this.maxErrors = maxErrors; + this.maxScanRangeLength = maxScanRangeLength; + this.fileBufferSize = fileBufferSize; + } + + public TestExecContext(int numNodes, int batchSize, boolean disableCodegen, + boolean abortOnError, int maxErrors) { + this(numNodes, batchSize, disableCodegen, abortOnError, maxErrors, 0, 0); } public boolean getAbortOnError() { @@ -43,6 +54,14 @@ public class TestExecContext { return numNodes; } + public long getMaxScanRangeLength() { + return maxScanRangeLength; + } + + public int getFileBufferSize() { + return fileBufferSize; + } + @Override public String toString() { return Objects.toStringHelper(this).add("NumNodes", numNodes) @@ -50,6 +69,8 @@ public class TestExecContext { .add("IsCodegenDisabled", disableCodegen) .add("AbortOnError", abortOnError) .add("MaxErrors", maxErrors) + .add("MaxScanRangeLength", maxScanRangeLength) + .add("FileBufferSize", fileBufferSize) .toString(); } } \ No newline at end of file diff --git a/fe/src/test/java/com/cloudera/impala/testutil/TestUtils.java b/fe/src/test/java/com/cloudera/impala/testutil/TestUtils.java index 5e76f7ac9..749603168 100644 --- a/fe/src/test/java/com/cloudera/impala/testutil/TestUtils.java +++ b/fe/src/test/java/com/cloudera/impala/testutil/TestUtils.java @@ -356,6 +356,7 @@ public class TestUtils { TQueryOptions options = new TQueryOptions(); options.setReturn_as_ascii(true); options.setNum_nodes(context.getNumNodes()); + options.setMax_scan_range_length(context.getMaxScanRangeLength()); TQueryRequest request = new TQueryRequest(query, options); ArrayList errors = new ArrayList(); SortedMap fileErrors = new TreeMap(); @@ -367,6 +368,7 @@ public class TestUtils { QueryExecTestResult actualExecResults = new QueryExecTestResult(); try { inProcessExecutor.runQuery(request, colTypes, colLabels, containsOrderBy, + context.getFileBufferSize(), context.getBatchSize(), context.getAbortOnError(), context.getMaxErrors(), context.isCodegenDisabled(), errors, fileErrors, resultQueue, insertResult); diff --git a/testdata/TinyTable/data.csv b/testdata/TinyTable/data.csv new file mode 100644 index 000000000..d927d3aa1 --- /dev/null +++ b/testdata/TinyTable/data.csv @@ -0,0 +1,3 @@ +aaaaaaa,bbbbbbb +ccccc,dddd +eeeeeeee,f diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index 3f9992746..a45e57603 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -965,3 +965,22 @@ ALTER TABLE %(table_name)s ADD PARTITION (string_col = "partition1"); ---- ---- ==== +functional +---- +tinytable +---- +CREATE EXTERNAL TABLE %(table_name)s ( + a string, + b string) +row format delimited fields terminated by ',' +stored as %(file_format)s +LOCATION '${hiveconf:hive.metastore.warehouse.dir}/%(table_name)s'; +---- +FROM %(base_table_name)s INSERT OVERWRITE TABLE %(table_name)s SELECT *; +---- +${IMPALA_HOME}/bin/run-query.sh --query=" \ + INSERT OVERWRITE TABLE %(table_name)s \ + select * FROM %(base_table_name)s" +---- +LOAD DATA LOCAL INPATH '${env:IMPALA_HOME}/testdata/TinyTable/data.csv' OVERWRITE INTO TABLE %(table_name)s; +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test new file mode 100644 index 000000000..ce1bdf11d --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test @@ -0,0 +1,16 @@ +select * from tinytable +---- TYPES +string, string +---- RESULTS +'aaaaaaa','bbbbbbb' +'ccccc','dddd' +'eeeeeeee','f' +==== +select a from tinytable +---- TYPES +string +---- RESULTS +'aaaaaaa' +'ccccc' +'eeeeeeee' +==== \ No newline at end of file diff --git a/testdata/workloads/tpch/queries/tpch-q1.test b/testdata/workloads/tpch/queries/tpch-q1.test index 3d8735fa7..f4b095427 100644 --- a/testdata/workloads/tpch/queries/tpch-q1.test +++ b/testdata/workloads/tpch/queries/tpch-q1.test @@ -20,7 +20,7 @@ group by ---- TYPES string, string, double, double, double, double, double, double, double, bigint ---- RESULTS -'A','F',37734075,56586511539.3,53758218131,55909025048.8,25.5,38273.1,0,1478492 +'A','F',37734107,56586554400.7,53758257134.9,55909065222.8,25.5,38273.1,0,1478493 'N','F',991417,1487504710.4,1413082168.1,1469649223.2,25.5,38284.5,0.1,38854 'N','O',74476040,111701729697.7,106118230307.6,110367043872.5,25.5,38249.1,0,2920374 'R','F',37719753,56568041380.9,53741292684.6,55889619119.8,25.5,38250.9,0.1,1478870