From b9ea32e9b77a3aa2fe2f4538345cb662e3d7330d Mon Sep 17 00:00:00 2001 From: Skye Wanderman-Milne Date: Wed, 14 Aug 2013 19:07:03 -0700 Subject: [PATCH] Fix IMPALA-129, IMPALA-534, and other scanner bugs. Change-Id: Idbd29af3fcc35b9e1173d08ac55b5780751c5938 Reviewed-on: http://gerrit.ent.cloudera.com:8080/196 Tested-by: jenkins Reviewed-by: Skye Wanderman-Milne Tested-by: Skye Wanderman-Milne --- be/src/exec/base-sequence-scanner.cc | 156 ++++------ be/src/exec/base-sequence-scanner.h | 31 +- be/src/exec/delimited-text-parser-test.cc | 2 +- be/src/exec/delimited-text-parser.cc | 29 +- be/src/exec/hdfs-avro-scanner.cc | 8 +- be/src/exec/hdfs-parquet-scanner.cc | 21 +- be/src/exec/hdfs-rcfile-scanner.cc | 221 +++++++------- be/src/exec/hdfs-rcfile-scanner.h | 8 +- be/src/exec/hdfs-scanner.cc | 11 +- be/src/exec/hdfs-sequence-scanner.cc | 151 ++++------ be/src/exec/hdfs-sequence-scanner.h | 7 +- be/src/exec/hdfs-text-scanner.cc | 15 +- be/src/exec/scanner-context.cc | 278 +++++++++--------- be/src/exec/scanner-context.h | 91 +++--- be/src/exec/scanner-context.inline.h | 60 ++-- be/src/runtime/disk-io-mgr.cc | 2 + be/src/runtime/row-batch.h | 1 + .../queries/QueryTest/hdfs-tiny-scan.test | 15 +- tests/query_test/test_scan_range_lengths.py | 3 - 19 files changed, 504 insertions(+), 606 deletions(-) diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 62d196927..9dbb39502 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -48,13 +48,11 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNode* scan_node, return Status::OK; } -BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* state, - bool marker_precedes_sync) +BaseSequenceScanner::BaseSequenceScanner(HdfsScanNode* node, RuntimeState* state) : HdfsScanner(node, state), header_(NULL), block_start_(0), - data_buffer_pool_(new MemPool(state->mem_limits())), - marker_precedes_sync_(marker_precedes_sync) { + data_buffer_pool_(new MemPool(state->mem_limits())) { } BaseSequenceScanner::~BaseSequenceScanner() { @@ -109,29 +107,23 @@ Status BaseSequenceScanner::ProcessSplit() { finished_ = false; RETURN_IF_ERROR(InitNewRange()); - Status status; + Status status = Status::OK; - // Find the first record - if (stream_->scan_range()->offset() == 0) { - // scan range that starts at the beginning of the file, just skip ahead by - // the header size. - if (!stream_->SkipBytes(header_->header_size, &status)) return status; - } else { - status = SkipToSync(header_->sync, SYNC_HASH_SIZE); - if (stream_->eosr()) { - // We don't care about status here -- OK if we can't find the sync but - // we're at the end of the scan range - return Status::OK; - } - RETURN_IF_ERROR(status); + // Skip to the first record + if (stream_->file_offset() < header_->header_size) { + // If the scan range starts within the header, skip to the end of the header so we + // don't accidentally skip to an extra sync within the header + RETURN_IF_FALSE(stream_->SkipBytes( + header_->header_size - stream_->file_offset(), &parse_status_)); } + RETURN_IF_ERROR(SkipToSync(header_->sync, SYNC_HASH_SIZE)); // Process Range. int64_t first_error_offset = 0; int num_errors = 0; // We can continue through errors by skipping to the next SYNC hash. - do { + while (!finished_) { status = ProcessRange(); if (status.IsCancelled()) return status; // Save the offset of any error. @@ -155,20 +147,15 @@ Status BaseSequenceScanner::ProcessSplit() { // If no errors or we abort on error then exit loop, otherwise try to recover. if (state_->abort_on_error() || status.ok()) break; - if (!stream_->eosr()) { - parse_status_ = Status::OK; - ++num_errors; - // Recover by skipping to the next sync. - int64_t error_offset = stream_->file_offset(); - status = SkipToSync(header_->sync, SYNC_HASH_SIZE); - COUNTER_UPDATE(bytes_skipped_counter_, stream_->file_offset() - error_offset); - if (status.IsCancelled()) return status; - if (stream_->eosr()) break; - - // An error status is explicitly ignored here so we can skip over bad blocks. - // We will continue through this loop again looking for the next sync. - } - } while (!stream_->eosr()); + // Recover by skipping to the next sync. + parse_status_ = Status::OK; + ++num_errors; + int64_t error_offset = stream_->file_offset(); + status = SkipToSync(header_->sync, SYNC_HASH_SIZE); + COUNTER_UPDATE(bytes_skipped_counter_, stream_->file_offset() - error_offset); + RETURN_IF_ERROR(status); + DCHECK(parse_status_.ok()); + } if (num_errors != 0 || !status.ok()) { if (state_->LogHasSpace()) { @@ -192,9 +179,8 @@ Status BaseSequenceScanner::ReadSync() { uint8_t* hash; int out_len; - bool eos; RETURN_IF_FALSE( - stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &eos, &parse_status_)); + stream_->GetBytes(SYNC_HASH_SIZE, &hash, &out_len, &parse_status_)); if (out_len != SYNC_HASH_SIZE || memcmp(hash, header_->sync, SYNC_HASH_SIZE)) { if (state_->LogHasSpace()) { stringstream ss; @@ -210,24 +196,14 @@ Status BaseSequenceScanner::ReadSync() { } return Status("bad sync hash block"); } - // TODO: finished_ |= end of file (this will prevent us from reading off - // the end of the file) + finished_ |= stream_->eof(); return Status::OK; } int BaseSequenceScanner::FindSyncBlock(const uint8_t* buffer, int buffer_len, const uint8_t* sync, int sync_len) { - StringValue needle; - char marker_and_sync[4 + sync_len]; - if (marker_precedes_sync_) { - marker_and_sync[0] = marker_and_sync[1] = - marker_and_sync[2] = marker_and_sync[3] = 0xff; - memcpy(marker_and_sync + 4, sync, sync_len); - needle = StringValue(marker_and_sync, 4 + sync_len); - } else { - char* sync_str = reinterpret_cast(const_cast(sync)); - needle = StringValue(sync_str, sync_len); - } + char* sync_str = reinterpret_cast(const_cast(sync)); + StringValue needle = StringValue(sync_str, sync_len); StringValue haystack( const_cast(reinterpret_cast(buffer)), buffer_len); @@ -238,9 +214,6 @@ int BaseSequenceScanner::FindSyncBlock(const uint8_t* buffer, int buffer_len, if (offset != -1) { // Advance offset past sync offset += sync_len; - if (marker_precedes_sync_) { - offset += 4; - } } return offset; } @@ -250,57 +223,58 @@ Status BaseSequenceScanner::SkipToSync(const uint8_t* sync, int sync_size) { int offset = -1; uint8_t* buffer; int buffer_len; - bool eosr; Status status; - - // A sync marker can span multiple buffers. In that case, we use this staging - // buffer to combine bytes from the buffers. - // The -1 marker (if present) and the sync can start anywhere in the last 19 bytes - // of the buffer, so we save the 19-byte tail of the buffer. - int tail_size = sync_size + sizeof(int32_t) - 1; - uint8_t split_buffer[2 * tail_size]; - // Read buffers until we find a sync or reach end of scan range - RETURN_IF_ERROR(stream_->GetRawBytes(&buffer, &buffer_len, &eosr)); - while (true) { - // Check if sync fully contained in current buffer + // Read buffers until we find a sync or reach the end of the scan range. If we read all + // the buffers remaining in the scan range and none of them contain a sync (including a + // sync that starts at the end of this scan range and continues into the next one), then + // there are no more syncs in this scan range and we're finished. + while (!stream_->eosr()) { + // Check if there's a sync fully contained in the current buffer + RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_len)); offset = FindSyncBlock(buffer, buffer_len, sync, sync_size); DCHECK_LE(offset, buffer_len); if (offset != -1) break; - // It wasn't in the full buffer, copy the bytes at the end - int bytes_first_buffer = ::min(tail_size, buffer_len); - uint8_t* bp = buffer + buffer_len - bytes_first_buffer; - memcpy(split_buffer, bp, bytes_first_buffer); + // No sync found in the current buffer, so check if there's a sync spanning the + // current buffer and the next. First we skip so there are sync_size - 1 bytes left, + // then we read these bytes plus the first sync_size - 1 bytes of the next buffer. + // This guarantees that we find any syncs that start in the current buffer and end in + // the next buffer. + int to_skip = max(0, buffer_len - (sync_size - 1)); + RETURN_IF_FALSE(stream_->SkipBytes(to_skip, &parse_status_)); + // Peek so we don't advance stream_ into the next buffer. If we don't find a sync here + // then we'll need to check all of the next buffer, including the first sync_size -1 + // bytes. + RETURN_IF_FALSE(stream_->GetBytes( + (sync_size - 1) * 2, &buffer, &buffer_len, &parse_status_, true)); + offset = FindSyncBlock(buffer, buffer_len, sync, sync_size); + DCHECK_LE(offset, buffer_len); + if (offset != -1) break; - // Read the next buffer - if (!stream_->SkipBytes(buffer_len, &status)) return status; - RETURN_IF_ERROR(stream_->GetRawBytes(&buffer, &buffer_len, &eosr)); - - // Copy the first few bytes of the next buffer and check again. - int bytes_second_buffer = ::min(tail_size, buffer_len); - memcpy(split_buffer + bytes_first_buffer, buffer, bytes_second_buffer); - offset = FindSyncBlock(split_buffer, - bytes_first_buffer + bytes_second_buffer, sync, sync_size); - if (offset != -1) { - DCHECK_GE(offset, bytes_first_buffer); - // Adjust the offset to be relative to the start of the new buffer - offset -= bytes_first_buffer; - break; - } - - if (eosr) { - // No sync marker found in this scan range - return Status::OK; - } + // No sync starting in this buffer, so advance stream_ to the beginning of the next + // buffer. + RETURN_IF_ERROR(stream_->GetBuffer(false, &buffer, &buffer_len)); } - // We found a sync at offset. offset cannot be 0 since it points to the end of - // the sync in the current buffer. - DCHECK_GT(offset, 0); - if (!stream_->SkipBytes(offset, &status)) return status; + if (offset == -1) { + // No more syncs in this scan range + DCHECK(stream_->eosr()); + finished_ = true; + return Status::OK; + } + DCHECK_GE(offset, sync_size); + + // Make sure sync starts in our scan range + if (offset - sync_size >= stream_->bytes_left()) { + finished_ = true; + return Status::OK; + } + + RETURN_IF_FALSE(stream_->SkipBytes(offset, &parse_status_)); VLOG_FILE << "Found sync for: " << stream_->filename() << " at " << stream_->file_offset() - sync_size; + if (stream_->eof()) finished_ = true; return Status::OK; } diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h index 1c88d559d..430e3a0b7 100644 --- a/be/src/exec/base-sequence-scanner.h +++ b/be/src/exec/base-sequence-scanner.h @@ -65,7 +65,8 @@ class BaseSequenceScanner : public HdfsScanner { // Enum for compression type. THdfsCompression::type compression_type; - // Byte size of header + // Byte size of header. This must not include the sync directly preceding the data + // (even if the sync is considered part of the header in the file format spec). int64_t header_size; }; @@ -100,19 +101,19 @@ class BaseSequenceScanner : public HdfsScanner { // Returns type of scanner: e.g. rcfile, seqfile virtual THdfsFileFormat::type file_format() const = 0; - - // - marker_precedes_sync: if true, sync markers are preceded by 4 bytes of - // 0xFFFFFFFF. - BaseSequenceScanner(HdfsScanNode*, RuntimeState*, bool marker_precedes_sync); - - // Read and validate sync marker against header_->sync. Returns non-ok if the - // sync marker did not match. Scanners should always use this function to read - // sync markers, otherwise finished() might not be updated correctly. + + BaseSequenceScanner(HdfsScanNode*, RuntimeState*); + + // Read and validate sync marker against header_->sync. Returns non-ok if the sync + // marker did not match. Scanners should always use this function to read sync markers, + // otherwise finished() might not be updated correctly. If finished() returns true after + // calling this function, scanners must not process any more records. Status ReadSync(); - // Utility function to advance past the next sync marker, reading bytes from - // context_. - // - sync: sync marker (does not include 0xFFFFFFFF prefix) + // Utility function to advance past the next sync marker, reading bytes from stream_. + // If no sync is found in the scan range, return Status::OK and sets finished_ to + // true. It is safe to call this function past eosr. + // - sync: sync marker to search for (does not include 0xFFFFFFFF prefix) // - sync_size: number of bytes for sync Status SkipToSync(const uint8_t* sync, int sync_size); @@ -156,12 +157,10 @@ class BaseSequenceScanner : public HdfsScanner { // scan range must process the following block since the second scan range // cannot find the incomplete sync. context_->eosr() will not alert us to this // situation, causing the block to be skipped. - // TODO(skye): update other scanners to use finished() instead of eosr + // + // finished_ is set by ReadSync() and SkipToSync(). bool finished_; - // See constructor. - bool marker_precedes_sync_; - // Utility function to look for 'sync' in buffer. Returns the offset into // buffer of the _end_ of sync if it is found, otherwise, returns -1. int FindSyncBlock(const uint8_t* buffer, int buffer_len, const uint8_t* sync, diff --git a/be/src/exec/delimited-text-parser-test.cc b/be/src/exec/delimited-text-parser-test.cc index dccddd17c..aaad5c9b5 100644 --- a/be/src/exec/delimited-text-parser-test.cc +++ b/be/src/exec/delimited-text-parser-test.cc @@ -61,7 +61,7 @@ TEST(DelimitedTextParser, Escapes) { // The parser doesn't support this case. // TODO: update test when it is fixed - ValidateTupleStart(&escape_parser, "@|no_delims", 2, TUPLE_DELIM); + ValidateTupleStart(&escape_parser, "@|no_delims", -1, TUPLE_DELIM); } // TODO: expand test for other delimited text parser functions/cases. diff --git a/be/src/exec/delimited-text-parser.cc b/be/src/exec/delimited-text-parser.cc index 4ab7a420b..32f4347e3 100644 --- a/be/src/exec/delimited-text-parser.cc +++ b/be/src/exec/delimited-text-parser.cc @@ -84,13 +84,13 @@ DelimitedTextParser::DelimitedTextParser(HdfsScanNode* scan_node, // scan_node_ can be NULL in test setups if (scan_node_ == NULL) return; - + ParserReset(); num_cols_ = scan_node_->num_cols(); is_materialized_col_ = new bool[num_cols_]; for (int i = 0; i < num_cols_; ++i) { - is_materialized_col_[i] = + is_materialized_col_[i] = scan_node_->GetMaterializedSlotIdx(i) != HdfsScanNode::SKIP_COLUMN; } } @@ -108,13 +108,13 @@ void DelimitedTextParser::ParserReset() { // Parsing raw csv data into FieldLocation descriptors. Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remaining_len, - char** byte_buffer_ptr, char** row_end_locations, + char** byte_buffer_ptr, char** row_end_locations, FieldLocation* field_locations, int* num_tuples, int* num_fields, char** next_column_start) { // Start of this batch. *next_column_start = *byte_buffer_ptr; // If there was a '\r' at the end of the last batch, set the offset to - // just before the begining. Otherwise make it invalid. + // just before the beginning. Otherwise make it invalid. if (last_row_delim_offset_ == 0) { last_row_delim_offset_ = remaining_len; } else { @@ -132,7 +132,7 @@ Status DelimitedTextParser::ParseFieldLocations(int max_tuples, int64_t remainin } if (*num_tuples == max_tuples) return Status::OK; - + // Handle the remaining characters while (remaining_len > 0) { bool new_tuple = false; @@ -205,7 +205,7 @@ int DelimitedTextParser::FindFirstInstance(const char* buffer, int len) { const char* buffer_start = buffer; bool found = false; - // If the last char in the previous buffer was \r then either return the start of + // If the last char in the previous buffer was \r then either return the start of // this buffer or skip a \n at the beginning of the buffer. if (last_row_delim_offset_ != -1) { if (*buffer_start == '\n') return 1; @@ -223,7 +223,7 @@ restart: xmm_buffer = _mm_loadu_si128(reinterpret_cast(buffer)); // This differs from ParseSse by using the slower cmpestrm instruction which // takes a chr_count and can search less than 16 bytes at a time. - xmm_tuple_mask = _mm_cmpestrm(xmm_tuple_search_, 1, xmm_buffer, + xmm_tuple_mask = _mm_cmpestrm(xmm_tuple_search_, 1, xmm_buffer, SSEUtil::CHARS_PER_128_BIT_REGISTER, SSEUtil::STRCHR_MODE); int tuple_mask = _mm_extract_epi16(xmm_tuple_mask, 0); if (tuple_mask != 0) { @@ -240,7 +240,7 @@ restart: tuple_start += SSEUtil::CHARS_PER_128_BIT_REGISTER; buffer += SSEUtil::CHARS_PER_128_BIT_REGISTER; } - } + } if (!found) { for (; tuple_start < len; ++tuple_start) { char c = *buffer++; @@ -252,6 +252,8 @@ restart: } } + if (!found) return -1; + if (escape_char_ != '\0') { // Scan backwards for escape characters. We do this after // finding the tuple break rather than during the (above) @@ -268,7 +270,7 @@ restart: break; } } - + // TODO: This sucks. All the preceding characters before the tuple delim were // escape characters. We need to read from the previous block to see what to do. if (before_tuple_end < 0) { @@ -277,24 +279,21 @@ restart: LOG(WARNING) << "Unhandled code path. This might cause a tuple to be " << "skipped or repeated."; warning_logged = true; - return tuple_start; } } // An even number of escape characters means they cancel out and this tuple break // is *not* escaped. - if (num_escape_chars % 2 != 0) { - goto restart; - } + if (num_escape_chars % 2 != 0) goto restart; } - if (!found) return -1; if (tuple_start == len - 1 && buffer_start[tuple_start] == '\r') { // If \r is the last char we need to wait to see if the next one is \n or not. last_row_delim_offset_ = 0; return -1; } - if (buffer_start[tuple_start] == '\n' && buffer_start[tuple_start - 1] == '\r') { + if (tuple_start < len && buffer_start[tuple_start] == '\n' && + buffer_start[tuple_start - 1] == '\r') { // We have \r\n, move to the next character. ++tuple_start; } diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index 279316c80..a10b6ff5d 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -69,7 +69,7 @@ struct ScopedAvroSchemaT { }; HdfsAvroScanner::HdfsAvroScanner(HdfsScanNode* scan_node, RuntimeState* state) - : BaseSequenceScanner(scan_node, state, /* marker_precedes_sync */ false), + : BaseSequenceScanner(scan_node, state), avro_header_(NULL) { } @@ -111,7 +111,7 @@ Status HdfsAvroScanner::ReadFileHeader() { RETURN_IF_FALSE(stream_->ReadBytes(SYNC_HASH_SIZE, &sync, &parse_status_)); memcpy(header_->sync, sync, SYNC_HASH_SIZE); - header_->header_size = stream_->total_bytes_returned(); + header_->header_size = stream_->total_bytes_returned() - SYNC_HASH_SIZE; return Status::OK; } @@ -456,7 +456,7 @@ Status HdfsAvroScanner::InitNewRange() { } Status HdfsAvroScanner::ProcessRange() { - while (!finished() && !stream_->eof()) { + while (!finished()) { // Read new data block block_start_ = stream_->file_offset(); @@ -500,7 +500,7 @@ Status HdfsAvroScanner::ProcessRange() { // No slots to materialize (e.g. count(*)), no need to decode data int n = min(num_records, static_cast(max_tuples)); int num_to_commit = WriteEmptyTuples(context_, tuple_row, n); - if (num_to_commit > 0) CommitRows(num_to_commit); + CommitRows(num_to_commit); num_records -= n; COUNTER_UPDATE(scan_node_->rows_read_counter(), n); } else { diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 79b4fb297..c0033cfaf 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -65,12 +65,13 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNode* scan_node, // Compute the offset of the file footer DCHECK_GT(files[i]->file_length, 0); - int64_t footer_start = max(0L, files[i]->file_length - FOOTER_SIZE); + int64_t footer_size = min(static_cast(FOOTER_SIZE), files[i]->file_length); + int64_t footer_start = files[i]->file_length - FOOTER_SIZE; ScanRangeMetadata* metadata = reinterpret_cast(files[i]->splits[0]->meta_data()); DiskIoMgr::ScanRange* footer_range = scan_node->AllocateScanRange( - files[i]->filename.c_str(), FOOTER_SIZE, + files[i]->filename.c_str(), footer_size, footer_start, metadata->partition_id, files[i]->splits[0]->disk_id()); footer_ranges.push_back(footer_range); } @@ -235,7 +236,6 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() { uint8_t* buffer; int num_bytes; - bool eos; // We're about to move to the next data page. The previous data page is // now complete, pass along the memory allocated for it. @@ -246,9 +246,9 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() { // a data page was found). while (true) { DCHECK_EQ(num_buffered_values_, 0); - RETURN_IF_ERROR(stream_->GetRawBytes(&buffer, &num_bytes, &eos)); + RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &num_bytes)); if (num_bytes == 0) { - DCHECK(eos); + DCHECK(stream_->eosr()); break; } @@ -266,7 +266,7 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() { memcpy(header_buffer, buffer, header_first_part); if (!stream_->SkipBytes(header_first_part, &status)) return status; - RETURN_IF_ERROR(stream_->GetRawBytes(&buffer, &num_bytes, &eos)); + RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &num_bytes)); if (num_bytes == 0) return status; uint32_t header_second_part = @@ -611,12 +611,9 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { while (true) { uint8_t* buffer; int len; - bool eos; - if (!stream_->GetBytes(0, &buffer, &len, &eos, &parse_status_)) { - return parse_status_; - } - DCHECK(eos); + RETURN_IF_ERROR(stream_->GetBuffer(false, &buffer, &len)); + DCHECK(stream_->eosr()); // Number of bytes in buffer after the fixed size footer is accounted for. int remaining_bytes_buffered = len - sizeof(int32_t) - sizeof(PARQUET_VERSION_NUMBER); @@ -680,7 +677,7 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { num_tuples -= max_tuples; int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples); - if (num_to_commit > 0) CommitRows(num_to_commit); + CommitRows(num_to_commit); } *eosr = true; diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index 4d529de55..c3db35d0c 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -52,7 +52,7 @@ const uint8_t HdfsRCFileScanner::RCFILE_VERSION_HEADER[4] = {'R', 'C', 'F', 1}; #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ HdfsRCFileScanner::HdfsRCFileScanner(HdfsScanNode* scan_node, RuntimeState* state) - : BaseSequenceScanner(scan_node, state, /* marker_precedes_sync */ true) { + : BaseSequenceScanner(scan_node, state) { } HdfsRCFileScanner::~HdfsRCFileScanner() { @@ -105,7 +105,7 @@ Status HdfsRCFileScanner::ReadFileHeader() { // Validate file version RETURN_IF_FALSE(stream_->ReadBytes( sizeof(RCFILE_VERSION_HEADER), &header, &parse_status_)); - if (!memcmp(header, HdfsSequenceScanner::SEQFILE_VERSION_HEADER, + if (!memcmp(header, HdfsSequenceScanner::SEQFILE_VERSION_HEADER, sizeof(HdfsSequenceScanner::SEQFILE_VERSION_HEADER))) { rc_header->version = SEQ6; } else if (!memcmp(header, RCFILE_VERSION_HEADER, sizeof(RCFILE_VERSION_HEADER))) { @@ -116,7 +116,7 @@ Status HdfsRCFileScanner::ReadFileHeader() { << ReadWriteUtil::HexDump(header, sizeof(RCFILE_VERSION_HEADER)) << "'"; return Status(ss.str()); } - + if (rc_header->version == SEQ6) { // Validate class name key/value uint8_t* class_name_key; @@ -183,8 +183,8 @@ Status HdfsRCFileScanner::ReadFileHeader() { uint8_t* sync; RETURN_IF_FALSE(stream_->ReadBytes(SYNC_HASH_SIZE, &sync, &parse_status_)); memcpy(header_->sync, sync, SYNC_HASH_SIZE); - - header_->header_size = stream_->total_bytes_returned(); + + header_->header_size = stream_->total_bytes_returned() - SYNC_HASH_SIZE; return Status::OK; } @@ -259,20 +259,6 @@ Status HdfsRCFileScanner::ReadRowGroup() { row_group_buffer_size_ = row_group_length_; } RETURN_IF_ERROR(ReadColumnBuffers()); - if (stream_->eosr()) { - if (stream_->eof()) break; - - // We must read up to the next sync marker. - int32_t record_length; - RETURN_IF_FALSE(stream_->ReadInt(&record_length, &parse_status_)); - - if (record_length == HdfsRCFileScanner::SYNC_MARKER) { - // If the marker is there, it's an error not to have a Sync block following the - // Marker. - RETURN_IF_ERROR(ReadSync()); - break; - } - } } return Status::OK; } @@ -280,11 +266,6 @@ Status HdfsRCFileScanner::ReadRowGroup() { Status HdfsRCFileScanner::ReadRowGroupHeader() { int32_t record_length; RETURN_IF_FALSE(stream_->ReadInt(&record_length, &parse_status_)); - // The sync block is marked with a record_length of -1. - if (record_length == HdfsRCFileScanner::SYNC_MARKER) { - RETURN_IF_ERROR(ReadSync()); - RETURN_IF_FALSE(stream_->ReadInt(&record_length, &parse_status_)); - } if (record_length < 0) { stringstream ss; int64_t position = stream_->file_offset(); @@ -305,7 +286,7 @@ Status HdfsRCFileScanner::ReadRowGroupHeader() { stringstream ss; int64_t position = stream_->file_offset(); position -= sizeof(int32_t); - ss << "Bad compressed key length: " << compressed_key_length_ + ss << "Bad compressed key length: " << compressed_key_length_ << " at offset: " << position; return Status(ss.str()); } @@ -424,8 +405,8 @@ Status HdfsRCFileScanner::ReadColumnBuffers() { RETURN_IF_FALSE( stream_->SkipBytes(column.buffer_len, &parse_status_)); continue; - } - + } + // TODO: Stream through these column buffers instead of reading everything // in at once. DCHECK_LE(column.uncompressed_buffer_len + column.start_offset, row_group_length_); @@ -445,7 +426,7 @@ Status HdfsRCFileScanner::ReadColumnBuffers() { RETURN_IF_FALSE(stream_->ReadBytes( column.buffer_len, &uncompressed_data, &parse_status_)); // TODO: this is bad. Remove this copy. - memcpy(row_group_buffer_ + column.start_offset, + memcpy(row_group_buffer_ + column.start_offset, uncompressed_data, column.buffer_len); } } @@ -459,97 +440,109 @@ Status HdfsRCFileScanner::ProcessRange() { // materialized columns into a row group buffer. // It will then materialize tuples from the row group buffer. When the row // group is complete, it will move onto the next row group. - while (!stream_->eosr() || num_rows_ != row_pos_) { - if (num_rows_ == row_pos_) { - // Finished materializing this row group, read the next one. - RETURN_IF_ERROR(ReadRowGroup()); - if (num_rows_ == 0) break; - } - - SCOPED_TIMER(scan_node_->materialize_tuple_timer()); - - // Indicates whether the current row has errors. - bool error_in_row = false; - const vector& materialized_slots = - scan_node_->materialized_slots(); - vector::const_iterator it; - - // Materialize rows from this row group in row batch sizes - MemPool* pool; - Tuple* tuple; - TupleRow* current_row; - int max_tuples = GetMemory(&pool, &tuple, ¤t_row); - max_tuples = min(max_tuples, num_rows_ - row_pos_); + while (!finished()) { + DCHECK_EQ(num_rows_, row_pos_); + // Finished materializing this row group, read the next one. + RETURN_IF_ERROR(ReadRowGroup()); + if (num_rows_ == 0) break; - if (materialized_slots.empty()) { - // If there are no materialized slots (e.g. count(*) or just partition cols) - // we can shortcircuit the parse loop - row_pos_ += max_tuples; - int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples); - if (num_to_commit > 0) CommitRows(num_to_commit); + while (num_rows_ != row_pos_) { + SCOPED_TIMER(scan_node_->materialize_tuple_timer()); + + // Indicates whether the current row has errors. + bool error_in_row = false; + const vector& materialized_slots = + scan_node_->materialized_slots(); + vector::const_iterator it; + + // Materialize rows from this row group in row batch sizes + MemPool* pool; + Tuple* tuple; + TupleRow* current_row; + int max_tuples = GetMemory(&pool, &tuple, ¤t_row); + max_tuples = min(max_tuples, num_rows_ - row_pos_); + + if (materialized_slots.empty()) { + // If there are no materialized slots (e.g. count(*) or just partition cols) + // we can shortcircuit the parse loop + row_pos_ += max_tuples; + int num_to_commit = WriteEmptyTuples(context_, current_row, max_tuples); + CommitRows(num_to_commit); + COUNTER_UPDATE(scan_node_->rows_read_counter(), max_tuples); + continue; + } + + int num_to_commit = 0; + for (int i = 0; i < max_tuples; ++i) { + RETURN_IF_ERROR(NextRow()); + + // Initialize tuple from the partition key template tuple before writing the + // slots + InitTuple(template_tuple_, tuple); + + for (it = materialized_slots.begin(); it != materialized_slots.end(); ++it) { + const SlotDescriptor* slot_desc = *it; + int file_column_idx = slot_desc->col_pos() - scan_node_->num_partition_keys(); + + // Set columns missing in this file to NULL + if (file_column_idx >= columns_.size()) { + tuple->SetNull(slot_desc->null_indicator_offset()); + continue; + } + + ColumnInfo& column = columns_[file_column_idx]; + DCHECK(column.materialize_column); + + const char* col_start = reinterpret_cast( + row_group_buffer_ + column.start_offset + column.buffer_pos); + int field_len = column.current_field_len; + DCHECK_LE(col_start + field_len, + reinterpret_cast(row_group_buffer_ + row_group_length_)); + + if (!text_converter_->WriteSlot(slot_desc, tuple, col_start, field_len, + stream_->compact_data(), false, pool)) { + ReportColumnParseError(slot_desc, col_start, field_len); + error_in_row = true; + } + } + + if (error_in_row) { + error_in_row = false; + if (state_->LogHasSpace()) { + stringstream ss; + ss << "file: " << stream_->filename(); + state_->LogError(ss.str()); + } + if (state_->abort_on_error()) { + state_->ReportFileErrors(stream_->filename(), 1); + return Status(state_->ErrorLog()); + } + } + + current_row->SetTuple(scan_node_->tuple_idx(), tuple); + // Evaluate the conjuncts and add the row to the batch + if (ExecNode::EvalConjuncts(conjuncts_, num_conjuncts_, current_row)) { + ++num_to_commit; + current_row = next_row(current_row); + tuple = next_tuple(tuple); + } + } + CommitRows(num_to_commit); COUNTER_UPDATE(scan_node_->rows_read_counter(), max_tuples); - continue; + if (scan_node_->ReachedLimit()) return Status::OK; + if (context_->cancelled()) return Status::CANCELLED; } - int num_to_commit = 0; - for (int i = 0; i < max_tuples; ++i) { - RETURN_IF_ERROR(NextRow()); + // RCFiles don't end with syncs + if (stream_->eof()) return Status::OK; - // Initialize tuple from the partition key template tuple before writing the - // slots - InitTuple(template_tuple_, tuple); - - for (it = materialized_slots.begin(); it != materialized_slots.end(); ++it) { - const SlotDescriptor* slot_desc = *it; - int file_column_idx = slot_desc->col_pos() - scan_node_->num_partition_keys(); - - // Set columns missing in this file to NULL - if (file_column_idx >= columns_.size()) { - tuple->SetNull(slot_desc->null_indicator_offset()); - continue; - } - - ColumnInfo& column = columns_[file_column_idx]; - DCHECK(column.materialize_column); - - const char* col_start = reinterpret_cast( - row_group_buffer_ + column.start_offset + column.buffer_pos); - int field_len = column.current_field_len; - DCHECK_LE(col_start + field_len, - reinterpret_cast(row_group_buffer_ + row_group_length_)); - - if (!text_converter_->WriteSlot(slot_desc, tuple, - col_start, field_len, stream_->compact_data(), false, pool)) { - ReportColumnParseError(slot_desc, col_start, field_len); - error_in_row = true; - } - } - - if (error_in_row) { - error_in_row = false; - if (state_->LogHasSpace()) { - stringstream ss; - ss << "file: " << stream_->filename(); - state_->LogError(ss.str()); - } - if (state_->abort_on_error()) { - state_->ReportFileErrors(stream_->filename(), 1); - return Status(state_->ErrorLog()); - } - } - - current_row->SetTuple(scan_node_->tuple_idx(), tuple); - // Evaluate the conjuncts and add the row to the batch - if (ExecNode::EvalConjuncts(conjuncts_, num_conjuncts_, current_row)) { - ++num_to_commit; - current_row = next_row(current_row); - tuple = next_tuple(tuple); - } + // Check for sync by looking for the marker that precedes syncs. + int marker; + RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ true)); + if (marker == HdfsRCFileScanner::SYNC_MARKER) { + RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ false)); + RETURN_IF_ERROR(ReadSync()); } - CommitRows(num_to_commit); - COUNTER_UPDATE(scan_node_->rows_read_counter(), max_tuples); - if (scan_node_->ReachedLimit()) break; - if (context_->cancelled()) return Status::CANCELLED; } return Status::OK; } @@ -557,7 +550,7 @@ Status HdfsRCFileScanner::ProcessRange() { void HdfsRCFileScanner::DebugString(int indentation_level, stringstream* out) const { // TODO: Add more details of internal state. *out << string(indentation_level * 2, ' ') - << "HdfsRCFileScanner(tupleid=" << scan_node_->tuple_idx() + << "HdfsRCFileScanner(tupleid=" << scan_node_->tuple_idx() << " file=" << stream_->filename(); // TODO: Scanner::DebugString // ExecNode::DebugString(indentation_level, out); diff --git a/be/src/exec/hdfs-rcfile-scanner.h b/be/src/exec/hdfs-rcfile-scanner.h index 7bbc27deb..efb3f3036 100644 --- a/be/src/exec/hdfs-rcfile-scanner.h +++ b/be/src/exec/hdfs-rcfile-scanner.h @@ -268,9 +268,7 @@ class HdfsRCFileScanner : public BaseSequenceScanner { // of columns. Other pieces of the metadata are ignored. Status ReadNumColumnsMetadata(); - // Read the rowgroup header - // Verifies: - // sync hash if pressent. + // Reads the rowgroup header starting after the sync. // Sets: // key_length_ // compressed_key_length_ @@ -312,9 +310,9 @@ class HdfsRCFileScanner : public BaseSequenceScanner { // cur_field_length_[col_idx] Status NextField(int col_idx); - // Read a row group into buffers. + // Read a row group (except for the sync marker and sync) into buffers. // Calls: - // ReadHeader + // ReadRowGroupHeader // ReadKeyBuffers // ReadColumnBuffers Status ReadRowGroup(); diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 26c20a550..0ab35b59c 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -162,11 +162,6 @@ void HdfsScanner::AddFinalRowBatch() { // 2. Eval conjuncts against the tuple. // 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch. int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) { - // Cap the number of result tuples up at the limit - if (scan_node_->limit() != -1) { - num_tuples = min(num_tuples, - static_cast(scan_node_->limit() - scan_node_->rows_returned())); - } DCHECK_GT(num_tuples, 0); if (template_tuple_ == NULL) { @@ -208,11 +203,7 @@ int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) { // 3. If it passes, stamp out 'num_tuples' copies of it into the row_batch. int HdfsScanner::WriteEmptyTuples(ScannerContext* context, TupleRow* row, int num_tuples) { - // Cap the number of result tuples up at the limit - if (scan_node_->limit() != -1) { - num_tuples = min(num_tuples, - static_cast(scan_node_->limit() - scan_node_->rows_returned())); - } + DCHECK_GE(num_tuples, 0); if (num_tuples == 0) return 0; if (!ExecNode::EvalConjuncts(conjuncts_, num_conjuncts_, row)) return 0; diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 3128d453b..fdeb96edb 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -38,7 +38,7 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state) - : BaseSequenceScanner(scan_node, state, /* marker_precedes_sync */ true), + : BaseSequenceScanner(scan_node, state), unparsed_data_buffer_(NULL), num_buffered_records_in_compressed_block_(0) { } @@ -106,39 +106,22 @@ BaseSequenceScanner::FileHeader* HdfsSequenceScanner::AllocateFileHeader() { } inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr, - int64_t* record_len, bool *eosr) { + int64_t* record_len) { // There are 2 cases: // Record-compressed -- like a regular record, but the data is compressed. // Uncompressed. block_start_ = stream_->file_offset(); - bool sync; - *eosr = stream_->eosr(); - Status stat = ReadBlockHeader(&sync); - if (!stat.ok()) { - *record_ptr = NULL; - if (*eosr) return Status::OK; - return stat; - } - - // If we read a sync mark and are past the end of the scan range we are done. - if (sync && *eosr) { - *record_ptr = NULL; - return Status::OK; - } - - // If we have not read the end the next sync mark keep going. - *eosr = false; + RETURN_IF_ERROR(ReadBlockHeader()); // We don't look at the keys, only the values. RETURN_IF_FALSE(stream_->SkipBytes(current_key_length_, &parse_status_)); if (header_->is_compressed) { int in_size = current_block_length_ - current_key_length_; - // Check for a reasonable size - if (in_size > stream_->scan_range()->len() || in_size < 0) { + if (in_size < 0) { stringstream ss; - ss << "Compressed record size is: " << in_size; + ss << "Invalid record size: " << in_size; if (state_->LogHasSpace()) state_->LogError(ss.str()); return Status(ss.str()); } @@ -165,9 +148,9 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr, } else { // Uncompressed records RETURN_IF_FALSE(stream_->ReadVLong(record_len, &parse_status_)); - if (*record_len > stream_->scan_range()->len() || *record_len < 0) { + if (*record_len < 0) { stringstream ss; - ss << "Record length is: " << record_len; + ss << "Invalid record length: " << *record_len; if (state_->LogHasSpace()) state_->LogError(ss.str()); return Status(ss.str()); } @@ -185,58 +168,42 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr, // a. Collect the start of records and their lengths // b. Parse cols locations to field_locations_ // c. Materialize those field locations to row batches +// 3. Read the sync indicator and check the sync block // This mimics the technique for text. // This function only returns on error or when the entire scan range is complete. Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() { DCHECK(header_->is_compressed); - while (!stream_->eosr() || num_buffered_records_in_compressed_block_ > 0) { + while (!finished()) { if (scan_node_->ReachedLimit()) return Status::OK; if (context_->cancelled()) return Status::CANCELLED; - if (num_buffered_records_in_compressed_block_ == 0) { - if (stream_->eosr()) return Status::OK; - // No more decompressed data, decompress the next block - RETURN_IF_ERROR(ReadCompressedBlock()); - if (num_buffered_records_in_compressed_block_ < 0) return parse_status_; - } + // Step 1 + RETURN_IF_ERROR(ReadCompressedBlock()); + if (num_buffered_records_in_compressed_block_ < 0) return parse_status_; // Step 2 - RETURN_IF_ERROR(ProcessDecompressedBlock()); - - // If we're finished processing the current block, read the marker + sync of - // the next block. We don't do this at the beginning of the loop since - // BaseSequenceScanner starts us after a sync, meaning we start by reading - // data. - if (num_buffered_records_in_compressed_block_ == 0) { - // If we are exactly at the end of the scan range, we don't attempt to - // read a sync marker because we may be at the end of file, and we don't - // want to trigger an "incomplete read" parse status and abort the - // query. This is safe to do even if we're not at the end of the file - // since we'll be finished after reading this sync. - // TODO: check EOF instead - if (stream_->bytes_left() == 0) return Status::OK; - - // Read the sync indicator and check the sync block. - int sync_indicator; - if (!stream_->ReadInt(&sync_indicator, &parse_status_)) { - // We've reached the end of the file - // TODO: check that we actually reached the end of the file - DCHECK(stream_->eosr()); - return Status::OK; - } - if (sync_indicator != -1) { - if (state_->LogHasSpace()) { - stringstream ss; - ss << "Expecting sync indicator (-1) at file offset " - << (stream_->file_offset() - sizeof(int)) << ". " - << "Sync indicator found " << sync_indicator << "."; - state_->LogError(ss.str()); - } - return Status("Bad sync hash"); - } - RETURN_IF_ERROR(ReadSync()); + while (num_buffered_records_in_compressed_block_ > 0) { + RETURN_IF_ERROR(ProcessDecompressedBlock()); } + + // SequenceFiles don't end with syncs + if (stream_->eof()) return Status::OK; + + // Step 3 + int sync_indicator; + RETURN_IF_FALSE(stream_->ReadInt(&sync_indicator, &parse_status_)); + if (sync_indicator != -1) { + if (state_->LogHasSpace()) { + stringstream ss; + ss << "Expecting sync indicator (-1) at file offset " + << (stream_->file_offset() - sizeof(int)) << ". " + << "Sync indicator found " << sync_indicator << "."; + state_->LogError(ss.str()); + } + return Status("Bad sync hash"); + } + RETURN_IF_ERROR(ReadSync()); } return Status::OK; @@ -252,7 +219,7 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() { if (scan_node_->materialized_slots().empty()) { // Handle case where there are no slots to materialize (e.g. count(*)) num_to_process = WriteEmptyTuples(context_, tuple_row, num_to_process); - if (num_to_process > 0) CommitRows(num_to_process); + CommitRows(num_to_process); COUNTER_UPDATE(scan_node_->rows_read_counter(), num_to_process); return Status::OK; } @@ -331,17 +298,11 @@ Status HdfsSequenceScanner::ProcessRange() { // this on each record. SCOPED_TIMER(scan_node_->materialize_tuple_timer()); - bool eosr = false; - while (!eosr && !stream_->eof()) { + while (!finished()) { DCHECK_GT(record_locations_.size(), 0); // Get the next compressed or uncompressed record. RETURN_IF_ERROR( - GetRecord(&record_locations_[0].record, &record_locations_[0].len, &eosr)); - - if (eosr) { - DCHECK(record_locations_[0].record == NULL); - break; - } + GetRecord(&record_locations_[0].record, &record_locations_[0].len)); MemPool* pool; TupleRow* tuple_row_mem; @@ -383,6 +344,17 @@ Status HdfsSequenceScanner::ProcessRange() { COUNTER_UPDATE(scan_node_->rows_read_counter(), 1); if (scan_node_->ReachedLimit()) break; if (context_->cancelled()) return Status::CANCELLED; + + // Sequence files don't end with syncs + if (stream_->eof()) return Status::OK; + + // Check for sync by looking for the marker that precedes syncs. + int marker; + RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ true)); + if (marker == SYNC_MARKER) { + RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ false)); + RETURN_IF_ERROR(ReadSync()); + } } return Status::OK; @@ -452,35 +424,18 @@ Status HdfsSequenceScanner::ReadFileHeader() { RETURN_IF_FALSE(stream_->ReadBytes(SYNC_HASH_SIZE, &sync, &parse_status_)); memcpy(header_->sync, sync, SYNC_HASH_SIZE); - if (header_->is_compressed && !seq_header->is_row_compressed && !stream_->eof()) { - // With block compression, record blocks have a leading -1 marker and sync - // (i.e., we just read the sync in the file header, but there is another - // sync immediately following it which is the beginning of the first - // block). We include this extra marker and sync in the header so that - // BaseSequenceScanner skips directly to the actual data in the first record - // block. - // TODO: this will cause the entire query to fail if this sync is corrupt - int marker; - RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_)); - if (marker != HdfsSequenceScanner::SYNC_MARKER) { - return Status("Didn't find sync marker after file header"); - } - RETURN_IF_ERROR(ReadSync()); - } - header_->header_size = stream_->total_bytes_returned(); + + if (!header_->is_compressed || seq_header->is_row_compressed) { + // Block-compressed scan ranges have an extra sync following the sync in the header, + // all other formats do not + header_->header_size -= SYNC_HASH_SIZE; + } return Status::OK; } -Status HdfsSequenceScanner::ReadBlockHeader(bool* sync) { +Status HdfsSequenceScanner::ReadBlockHeader() { RETURN_IF_FALSE(stream_->ReadInt(¤t_block_length_, &parse_status_)); - *sync = false; - if (current_block_length_ == HdfsSequenceScanner::SYNC_MARKER) { - RETURN_IF_ERROR(ReadSync()); - RETURN_IF_FALSE( - stream_->ReadInt(¤t_block_length_, &parse_status_)); - *sync = true; - } if (current_block_length_ < 0) { stringstream ss; int64_t position = stream_->file_offset(); diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h index fee206f48..d1381537a 100644 --- a/be/src/exec/hdfs-sequence-scanner.h +++ b/be/src/exec/hdfs-sequence-scanner.h @@ -190,10 +190,10 @@ class HdfsSequenceScanner : public BaseSequenceScanner { // This is always "org.apache.hadoop.io.Text" static const char* const SEQFILE_VALUE_CLASS_NAME; - // Read the record header, return if there was a sync block. + // Read the record header. // Sets: // current_block_length_ - Status ReadBlockHeader(bool* sync); + Status ReadBlockHeader(); // Process an entire block compressed scan range. Block compressed ranges are // more common and can be parsed more efficiently in larger pieces. @@ -212,8 +212,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner { // Output: // record_ptr: ponter to the record. // record_len: length of the record - // eors: set to true if we are at the end of the scan range. - Status GetRecord(uint8_t** record_ptr, int64_t *record_len, bool* eosr); + Status GetRecord(uint8_t** record_ptr, int64_t *record_len); // Appends the current file and line to the RuntimeState's error log. // row_idx is 0-based (in current batch) where the parse error occurred. diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index ed2efaa1e..1d7d7e14c 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -264,9 +264,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { } // Commit the rows to the row batch and scan node - if (num_tuples_materialized > 0) { - CommitRows(num_tuples_materialized); - } + CommitRows(num_tuples_materialized); COUNTER_UPDATE(scan_node_->rows_read_counter(), *num_tuples); @@ -286,9 +284,16 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) { *eosr = false; Status status; - stream_->GetBytes(num_bytes, reinterpret_cast(&byte_buffer_ptr_), - &byte_buffer_read_size_, eosr, &status); + if (num_bytes > 0) { + stream_->GetBytes(num_bytes, reinterpret_cast(&byte_buffer_ptr_), + &byte_buffer_read_size_, &status); + } else { + DCHECK_EQ(num_bytes, 0); + status = stream_->GetBuffer(false, reinterpret_cast(&byte_buffer_ptr_), + &byte_buffer_read_size_); + } byte_buffer_end_ = byte_buffer_ptr_ + byte_buffer_read_size_; + *eosr = stream_->eosr(); return status; } diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index e8af2bf83..4cae132d3 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -52,7 +52,7 @@ void ScannerContext::AttachCompletedResources(RowBatch* batch, bool done) { } ScannerContext::Stream::Stream(ScannerContext* parent) - : parent_(parent), total_len_(0), + : parent_(parent), boundary_pool_(new MemPool(parent->state_->mem_limits())), boundary_buffer_(new StringBuffer(boundary_pool_.get())) { } @@ -63,26 +63,27 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { stream->file_desc_ = scan_node_->GetFileDesc(stream->filename()); stream->scan_range_start_ = range->offset(); stream->total_bytes_returned_ = 0; - stream->current_buffer_pos_ = NULL; + stream->io_buffer_pos_ = NULL; stream->read_past_buffer_size_ = DEFAULT_READ_PAST_SIZE; - stream->total_len_ = range->len(); - stream->read_eosr_ = false; - stream->current_buffer_ = NULL; - stream->current_buffer_bytes_left_ = 0; + stream->io_buffer_ = NULL; + stream->io_buffer_bytes_left_ = 0; + stream->boundary_buffer_bytes_left_ = 0; + stream->output_buffer_pos_ = NULL; + stream->output_buffer_bytes_left_ = &stream->io_buffer_bytes_left_; streams_.push_back(stream); return stream; } void ScannerContext::Stream::ReturnAllBuffers() { - if (current_buffer_ != NULL) completed_buffers_.push_back(current_buffer_); - for (list::iterator it = completed_buffers_.begin(); - it != completed_buffers_.end(); ++it) { + if (io_buffer_ != NULL) completed_io_buffers_.push_back(io_buffer_); + for (list::iterator it = completed_io_buffers_.begin(); + it != completed_io_buffers_.end(); ++it) { (*it)->Return(); --parent_->scan_node_->num_owned_io_buffers_; } - current_buffer_ = NULL; - current_buffer_pos_ = NULL; - current_buffer_bytes_left_ = 0; + io_buffer_ = NULL; + io_buffer_pos_ = NULL; + io_buffer_bytes_left_ = 0; // Cancel the underlying scan range to clean up any queued buffers there if (scan_range_ != NULL) scan_range_->Cancel(Status::CANCELLED); @@ -92,14 +93,14 @@ void ScannerContext::Stream::AttachCompletedResources(RowBatch* batch, bool done DCHECK(batch != NULL); if (done) { // Mark any pending resources as completed - if (current_buffer_ != NULL) completed_buffers_.push_back(current_buffer_); - current_buffer_ = NULL; + if (io_buffer_ != NULL) completed_io_buffers_.push_back(io_buffer_); + io_buffer_ = NULL; // Cancel the underlying scan range to clean up any queued buffers there scan_range_->Cancel(Status::CANCELLED); } - for (list::iterator it = completed_buffers_.begin(); - it != completed_buffers_.end(); ++it) { + for (list::iterator it = completed_io_buffers_.begin(); + it != completed_io_buffers_.end(); ++it) { if (compact_data_) { (*it)->Return(); --parent_->scan_node_->num_owned_io_buffers_; @@ -110,7 +111,7 @@ void ScannerContext::Stream::AttachCompletedResources(RowBatch* batch, bool done // there are too many, we should compact. } } - completed_buffers_.clear(); + completed_io_buffers_.clear(); if (!compact_data_) { // If we're not done, keep using the last chunk allocated in boundary_pool_ so we @@ -120,157 +121,156 @@ void ScannerContext::Stream::AttachCompletedResources(RowBatch* batch, bool done } Status ScannerContext::Stream::GetNextBuffer() { - if (current_buffer_ != NULL) { - read_eosr_ = current_buffer_->eosr(); - completed_buffers_.push_back(current_buffer_); - current_buffer_ = NULL; + if (parent_->cancelled()) return Status::CANCELLED; + + // io_buffer_ should only be null the first time this is called + DCHECK(io_buffer_ != NULL || + (total_bytes_returned_ == 0 && completed_io_buffers_.empty())); + + // We can't use the eosr() function because it reflects how many bytes have been + // returned, not if we're fetched all the buffers in the scan range + bool eosr = false; + if (io_buffer_ != NULL) { + eosr = io_buffer_->eosr(); + completed_io_buffers_.push_back(io_buffer_); + io_buffer_ = NULL; } - if (!read_eosr_) RETURN_IF_ERROR(scan_range_->GetNext(¤t_buffer_)); - - if (current_buffer_ == NULL) { - // NULL indicates eosr - current_buffer_pos_ = NULL; - current_buffer_bytes_left_ = 0; + if (!eosr) { + RETURN_IF_ERROR(scan_range_->GetNext(&io_buffer_)); } else { - ++parent_->scan_node_->num_owned_io_buffers_; - current_buffer_pos_ = reinterpret_cast(current_buffer_->buffer()); - current_buffer_bytes_left_ = current_buffer_->len(); - } + // TODO: we're reading past this scan range so this is likely a remote read. + // Update when the IoMgr has better support for remote reads. + int64_t offset = file_offset() + boundary_buffer_bytes_left_; + DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange( + filename(), read_past_buffer_size_, offset, -1, scan_range_->disk_id()); + RETURN_IF_ERROR(parent_->state_->io_mgr()->Read( + parent_->scan_node_->reader_context(), range, &io_buffer_)); + } + + DCHECK(io_buffer_ != NULL); + ++parent_->scan_node_->num_owned_io_buffers_; + io_buffer_pos_ = reinterpret_cast(io_buffer_->buffer()); + io_buffer_bytes_left_ = io_buffer_->len(); + return Status::OK; } -Status ScannerContext::Stream::GetRawBytes(uint8_t** out_buffer, int* len, bool* eos) { +Status ScannerContext::Stream::GetBuffer(bool peek, uint8_t** out_buffer, int* len) { *out_buffer = NULL; *len = 0; + if (eosr()) return Status::OK; if (parent_->cancelled()) { DCHECK(*out_buffer == NULL); return Status::CANCELLED; } - // If there is no current data, fetch the first available buffer. - if (current_buffer_bytes_left_ == 0) { - return GetBytesInternal(0, out_buffer, true, len, eos); - } + if (boundary_buffer_bytes_left_ > 0) { + *out_buffer = boundary_buffer_pos_; + // Don't return more bytes past eosr + *len = min(static_cast(boundary_buffer_bytes_left_), bytes_left()); + DCHECK_GE(*len, 0); + if (!peek) { + boundary_buffer_pos_ += *len; + boundary_buffer_bytes_left_ -= *len; + total_bytes_returned_ += *len; + } + return Status::OK; + } - *out_buffer = current_buffer_pos_; - *len = current_buffer_bytes_left_; - *eos = current_buffer_->eosr(); + if (io_buffer_bytes_left_ == 0) { + output_buffer_pos_ = &io_buffer_pos_; + RETURN_IF_ERROR(GetNextBuffer()); + } + DCHECK(io_buffer_ != NULL); + + *out_buffer = io_buffer_pos_; + *len = io_buffer_bytes_left_; + if (!peek) { + io_buffer_bytes_left_ = 0; + io_buffer_pos_ += *len; + total_bytes_returned_ += *len; + } + DCHECK_GE(bytes_left(), 0); return Status::OK; } -Status ScannerContext::Stream::GetBytesInternal(int requested_len, - uint8_t** out_buffer, bool peek, int* out_len, bool* eos) { - *out_len = 0; +Status ScannerContext::Stream::GetBytesInternal( + int requested_len, uint8_t** out_buffer, bool peek, int* out_len) { + DCHECK_GT(requested_len, boundary_buffer_bytes_left_); *out_buffer = NULL; - *eos = true; - if (current_buffer_bytes_left_ == 0) RETURN_IF_ERROR(GetNextBuffer()); - - // The previous boundary buffer must have been processed by the scanner. - if (compact_data()) { - boundary_buffer_->Clear(); - } else { - boundary_buffer_->Reset(); - } - - // The caller requested a complete buffer but there are no more bytes - if (requested_len == 0 && eosr()) return Status::OK; - - // Loop and wait for the next buffer - while (true) { - if (parent_->cancelled()) return Status::CANCELLED; - if (current_buffer_bytes_left_ == 0) RETURN_IF_ERROR(GetNextBuffer()); - - if (requested_len == 0) { - DCHECK(current_buffer_ != NULL); - DCHECK(*out_len == 0); - requested_len = current_buffer_bytes_left_; - } - - // Not enough bytes, copy the end of this buffer and combine it with the next one - if (requested_len > current_buffer_bytes_left_) { - if (current_buffer_ != NULL) { - boundary_buffer_->Append(current_buffer_pos_, current_buffer_bytes_left_); - *out_len += current_buffer_bytes_left_; - requested_len -= current_buffer_bytes_left_; - total_bytes_returned_ += current_buffer_bytes_left_; - RETURN_IF_ERROR(GetNextBuffer()); - } - - if (!eosr()) continue; - - // We are at the end of the scan range and there are still not enough bytes - // to satisfy the request. Issue a sync read to the io mgr and keep going - DCHECK(current_buffer_ == NULL); - DCHECK_EQ(current_buffer_bytes_left_, 0); - - // TODO: we're reading past this scan range so this is likely a remote read. - // Update when the IoMgr has better support for remote reads. - DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange( - filename(), read_past_buffer_size_, file_offset(), -1, scan_range_->disk_id()); - - DiskIoMgr::BufferDescriptor* buffer_desc; - Status status = parent_->state_->io_mgr()->Read( - parent_->scan_node_->reader_context(), range, &buffer_desc); - if (!status.ok()) { - if (buffer_desc != NULL) buffer_desc->Return(); - return status; - } - - ++parent_->scan_node_->num_owned_io_buffers_; - - DCHECK(!peek); - current_buffer_ = buffer_desc; - current_buffer_bytes_left_ = current_buffer_->len(); - current_buffer_pos_ = reinterpret_cast(current_buffer_->buffer()); - - if (current_buffer_bytes_left_ == 0) { - // Tried to read past but there were no more bytes (i.e. EOF) - *out_buffer = reinterpret_cast(boundary_buffer_->str().ptr); - *eos = true; - return Status::OK; - } - continue; - } - - // We have enough bytes - int num_bytes = min(current_buffer_bytes_left_, requested_len); - *out_len += num_bytes; - if (peek) { - *out_buffer = current_buffer_pos_; + if (boundary_buffer_bytes_left_ == 0) { + if (compact_data()) { + boundary_buffer_->Clear(); } else { - DCHECK(!peek); - current_buffer_bytes_left_ -= num_bytes; - total_bytes_returned_ += num_bytes; - DCHECK_GE(current_buffer_bytes_left_, 0); - - if (boundary_buffer_->Empty()) { - // No stitching, just return the memory - *out_buffer = current_buffer_pos_; - } else { - boundary_buffer_->Append(current_buffer_pos_, num_bytes); - *out_buffer = reinterpret_cast(boundary_buffer_->str().ptr); - } - current_buffer_pos_ += num_bytes; + boundary_buffer_->Reset(); } - - *eos = (current_buffer_bytes_left_ == 0) && current_buffer_->eosr(); - return Status::OK; } + + while (requested_len > boundary_buffer_bytes_left_ + io_buffer_bytes_left_) { + // We need to fetch more bytes. Copy the end of the current buffer and fetch the next + // one. + boundary_buffer_->Append(io_buffer_pos_, io_buffer_bytes_left_); + boundary_buffer_bytes_left_ += io_buffer_bytes_left_; + + RETURN_IF_ERROR(GetNextBuffer()); + if (parent_->cancelled()) return Status::CANCELLED; + + if (io_buffer_bytes_left_ == 0) { + // No more bytes (i.e. EOF) + break; + } + } + + // We have enough bytes in io_buffer_ or couldn't read more bytes + int requested_bytes_left = requested_len - boundary_buffer_bytes_left_; + DCHECK_GE(requested_len, 0); + int num_bytes = min(io_buffer_bytes_left_, requested_bytes_left); + *out_len = boundary_buffer_bytes_left_ + num_bytes; + DCHECK_LE(*out_len, requested_len); + + if (boundary_buffer_bytes_left_ == 0) { + // No stitching, just return the memory + output_buffer_pos_ = &io_buffer_pos_; + output_buffer_bytes_left_ = &io_buffer_bytes_left_; + } else { + boundary_buffer_->Append(io_buffer_pos_, num_bytes); + boundary_buffer_bytes_left_ += num_bytes; + boundary_buffer_pos_ = reinterpret_cast(boundary_buffer_->str().ptr) + + boundary_buffer_->Size() - boundary_buffer_bytes_left_; + io_buffer_bytes_left_ -= num_bytes; + io_buffer_pos_ += num_bytes; + + output_buffer_pos_ = &boundary_buffer_pos_; + output_buffer_bytes_left_ = &boundary_buffer_bytes_left_; + } + *out_buffer = *output_buffer_pos_; + + if (!peek) { + total_bytes_returned_ += *out_len; + if (boundary_buffer_bytes_left_ == 0) { + io_buffer_bytes_left_ -= num_bytes; + io_buffer_pos_ += num_bytes; + } else { + DCHECK_EQ(boundary_buffer_bytes_left_, *out_len); + boundary_buffer_bytes_left_ = 0; + } + } + + return Status::OK; } void ScannerContext::Close() { // Set variables to NULL to make sure this object is not being used after Close() for (int i = 0; i < streams_.size(); ++i) { - streams_[i]->read_eosr_ = false; - streams_[i]->current_buffer_ = NULL; - streams_[i]->current_buffer_pos_ = NULL; + DCHECK(streams_[i]->io_buffer_ == NULL); + streams_[i]->io_buffer_pos_ = NULL; } for (int i = 0; i < streams_.size(); ++i) { - DCHECK(streams_[i]->completed_buffers_.empty()); + DCHECK(streams_[i]->completed_io_buffers_.empty()); } } @@ -278,10 +278,6 @@ bool ScannerContext::cancelled() const { return scan_node_->done_; } -bool ScannerContext::Stream::eof() { - return file_offset() == file_desc_->file_length; -} - Status ScannerContext::Stream::ReportIncompleteRead(int length, int bytes_read) { stringstream ss; ss << "Tried to read " << length << " bytes but could only read " diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index ea97ba1ce..b7684f3a5 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -65,30 +65,29 @@ class ScannerContext { // one stream; for columnar, there is one stream per column. class Stream { public: - // Returns the next *len bytes or an error. This can block if bytes are not + // Returns up to requested_len bytes or an error. This can block if bytes are not // available. + // - requested_len is the number of bytes requested. This function will return + // those number of bytes unless end of file or an error occurred. + // - If peek is true, the scan range position is not incremented (i.e. repeated calls + // with peek = true will return the same data). // - *buffer on return is a pointer to the buffer. The memory is owned by // the ScannerContext and should not be modified. If the buffer is entirely // from one disk io buffer, a pointer inside that buffer is returned directly. - // if the requested buffer straddles io buffers, a copy is done here. - // - requested_len is the number of bytes requested. This function will return - // those number of bytes unless end of file or an error occurred. - // if requested_len is 0, the next complete buffer will be returned + // If the requested buffer straddles io buffers, a copy is done here. // - *out_len is the number of bytes returned. - // - *eos is set to true if all the bytes in this scan range are returned. // - *status is set if there is an error. // Returns true if the call was success (i.e. status->ok()) // This should only be called from the scanner thread. - // Note that this will return bytes past the end of the scan range if - // requested (e.g., this can be called again after *eos is set to true). + // Note that this will return bytes past the end of the scan range until the end of + // the file. bool GetBytes(int requested_len, uint8_t** buffer, int* out_len, - bool* eos, Status* status); + Status* status, bool peek = false); - // Gets the bytes from the first available buffer without advancing the scan - // range location (e.g. repeated calls to this function will return the same thing). - // If the buffer is the last one in the scan range, *eos will be set to true. - // If we are past the end of the scan range, *out_len will be 0 and *eos will be true. - Status GetRawBytes(uint8_t** buffer, int* out_len, bool* eos); + // Gets the bytes from the first available buffer within the scan range. This may be + // the boundary buffer used to stitch IO buffers together. + // If we are past the end of the scan range, no bytes are returned. + Status GetBuffer(bool peek, uint8_t** buffer, int* out_len); // Sets whether of not the resulting tuples have a compact format. If not, the // io buffers must be attached to the row batch, otherwise they can be returned @@ -110,16 +109,16 @@ class ScannerContext { int64_t bytes_left() { return scan_range_->len() - total_bytes_returned_; } // If true, all bytes in this scan range have been returned - bool eosr() const { return read_eosr_ || total_bytes_returned_ >= total_len_; } + bool eosr() const { return total_bytes_returned_ >= scan_range_->len(); } // If true, the stream has reached the end of the file. - bool eof(); + bool eof() const; const char* filename() { return scan_range_->file(); } const DiskIoMgr::ScanRange* scan_range() { return scan_range_; } // Returns the buffer's current offset in the file. - int64_t file_offset() { return scan_range_start_ + total_bytes_returned_; } + int64_t file_offset() const { return scan_range_start_ + total_bytes_returned_; } // Returns the total number of bytes returned int64_t total_bytes_returned() { return total_bytes_returned_; } @@ -130,7 +129,7 @@ class ScannerContext { // Read an Integer primitive value written using Java serialization. // Equivalent to java.io.DataInput.readInt() - bool ReadInt(int32_t* val, Status*); + bool ReadInt(int32_t* val, Status*, bool peek = false); // Read a variable-length Long value written using Writable serialization. // Ref: org.apache.hadoop.io.WritableUtils.readVLong() @@ -148,7 +147,7 @@ class ScannerContext { // Read length bytes into the supplied buffer. The returned buffer is owned // by this object. - bool ReadBytes(int length, uint8_t** buf, Status*); + bool ReadBytes(int length, uint8_t** buf, Status*, bool peek = false); // Read a Writable Text value from the supplied file. // Ref: org.apache.hadoop.io.WritableUtils.readString() @@ -174,48 +173,55 @@ class ScannerContext { // Total number of bytes returned from GetBytes() int64_t total_bytes_returned_; - // Byte offset into the current (first) io buffer. - uint8_t* current_buffer_pos_; - - // Bytes left in the first buffer - int current_buffer_bytes_left_; - // The buffer size to use for when reading past the end of the scan range. A // default value is pickd and scanners can overwrite it (i.e. the scanner knows // more about the file format) int read_past_buffer_size_; - // Total number of bytes that's expected to to be read from this stream. The - // actual number could be higher if we need to read bytes past the end. - int64_t total_len_; + // The current io buffer. This starts as NULL before we've read any bytes. + DiskIoMgr::BufferDescriptor* io_buffer_; - // Set to true when a buffer returns the end of the scan range. - bool read_eosr_; + // Next byte to read in io_buffer_ + uint8_t* io_buffer_pos_; - // Pool for allocating boundary buffers. + // Bytes left in io_buffer_ + int io_buffer_bytes_left_; + + // The boundary buffer is used to copy multiple IO buffers from the scan range into a + // single buffer to return to the scanner. After copying all or part of an IO buffer + // into the boundary buffer, the current buffer's state is updated to no longer + // include the copied bytes (e.g., io_buffer_bytes_left_ is decremented). + // Conceptually, the data in the boundary buffer always comes before that in the + // current buffer, and all the bytes in the stream are either already returned to the + // scanner, in the current IO buffer, or in the boundary buffer. boost::scoped_ptr boundary_pool_; boost::scoped_ptr boundary_buffer_; + uint8_t* boundary_buffer_pos_; + int boundary_buffer_bytes_left_; + + // Points to either io_buffer_pos_ or boundary_buffer_pos_ + uint8_t** output_buffer_pos_; + // Points to either io_buffer_bytes_left_ or boundary_buffer_bytes_left_ + int* output_buffer_bytes_left_; // List of buffers that are completed but still have bytes referenced by the caller. // On the next GetBytes() call, these buffers are released (the caller by calling // GetBytes() signals it is done with its previous bytes). At this point the // buffers are either returned to the io mgr or attached to the current row batch. - std::list completed_buffers_; - - // The current io buffer. This starts as NULL before we've read any bytes - // and then becomes NULL when we've finished the scan range. - DiskIoMgr::BufferDescriptor* current_buffer_; + std::list completed_io_buffers_; Stream(ScannerContext* parent); - // GetBytes helper to handle the slow path + // GetBytes helper to handle the slow path. // If peek is set then return the data but do not move the current offset. - // Updates current_buffer_. - Status GetBytesInternal(int requested_len, uint8_t** buffer, - bool peek, int* out_len, bool* eos); + Status GetBytesInternal(int requested_len, uint8_t** buffer, bool peek, int* out_len); - // Gets (and blocks) for the next io buffer. - // Updates current_buffer_. + // Gets (and blocks) for the next io buffer. After fetching all buffers in the scan + // range, performs synchronous reads past the scan range until EOF. Updates + // io_buffer_, io_buffer_bytes_left_, and io_buffer_pos_. If GetNextBuffer() is + // called after all bytes in the file have been returned, io_buffer_bytes_left_ will + // be set to 0. In the non-error case, io_buffer_ is never set to NULL, even if it + // contains 0 bytes. Status GetNextBuffer(); // Attach all completed io buffers and the boundary mem pool to batch. @@ -275,4 +281,3 @@ class ScannerContext { } #endif - diff --git a/be/src/exec/scanner-context.inline.h b/be/src/exec/scanner-context.inline.h index 2c24d36d4..62728ec24 100644 --- a/be/src/exec/scanner-context.inline.h +++ b/be/src/exec/scanner-context.inline.h @@ -18,6 +18,7 @@ #include "exec/scanner-context.h" #include "exec/read-write-util.h" +#include "runtime/string-buffer.h" using namespace impala; @@ -28,49 +29,36 @@ using namespace impala; // is the path used by sequence/rc/parquet file formats to read a very small number // (i.e. single int) of bytes. inline bool ScannerContext::Stream::GetBytes(int requested_len, uint8_t** buffer, - int* out_len, bool* eos, Status* status) { - + int* out_len, Status* status, bool peek) { if (UNLIKELY(requested_len == 0)) { - *status = GetBytesInternal(requested_len, buffer, false, out_len, eos); - return status->ok(); + *out_len = 0; + return true; } - - // Note: the fast path does not grab any locks even though another thread might be - // updating current_buffer_bytes_left_, current_buffer_ and current_buffer_pos_. - // See the implementation of AddBuffer() on why this is okay. - if (LIKELY(requested_len < current_buffer_bytes_left_)) { - *eos = false; - // Memory barrier to guarantee current_buffer_pos_ is not read before the - // above if statement. - __sync_synchronize(); - DCHECK(current_buffer_ != NULL); - *buffer = current_buffer_pos_; + if (LIKELY(requested_len <= *output_buffer_bytes_left_)) { *out_len = requested_len; - current_buffer_bytes_left_ -= requested_len; - current_buffer_pos_ += requested_len; - total_bytes_returned_ += *out_len; - if (UNLIKELY(current_buffer_bytes_left_ == 0)) { - *eos = current_buffer_->eosr(); + *buffer = *output_buffer_pos_; + if (LIKELY(!peek)) { + total_bytes_returned_ += *out_len; + *output_buffer_pos_ += *out_len; + *output_buffer_bytes_left_ -= *out_len; } return true; } - *status = GetBytesInternal(requested_len, buffer, false, out_len, eos); + DCHECK_GT(requested_len, 0); + *status = GetBytesInternal(requested_len, buffer, peek, out_len); return status->ok(); } -inline bool ScannerContext::Stream::ReadBytes(int length, uint8_t** buf, Status* status) { +inline bool ScannerContext::Stream::ReadBytes( + int length, uint8_t** buf, Status* status, bool peek) { if (UNLIKELY(length < 0)) { *status = Status("Negative length"); return false; } - if (UNLIKELY(length == 0)) { - *status = Status::OK; - return true; - } int bytes_read; - bool dummy_eos; - RETURN_IF_FALSE(GetBytes(length, buf, &bytes_read, &dummy_eos, status)); + RETURN_IF_FALSE(GetBytes(length, buf, &bytes_read, status, peek)); if (UNLIKELY(length != bytes_read)) { + DCHECK_LT(bytes_read, length); *status = ReportIncompleteRead(length, bytes_read); return false; } @@ -80,15 +68,11 @@ inline bool ScannerContext::Stream::ReadBytes(int length, uint8_t** buf, Status* // TODO: consider implementing a Skip in the context/stream object that's more // efficient than GetBytes. inline bool ScannerContext::Stream::SkipBytes(int length, Status* status) { - if (UNLIKELY(length == 0)) { - *status = Status::OK; - return true; - } uint8_t* dummy_buf; int bytes_read; - bool dummy_eos; - RETURN_IF_FALSE(GetBytes(length, &dummy_buf, &bytes_read, &dummy_eos, status)); + RETURN_IF_FALSE(GetBytes(length, &dummy_buf, &bytes_read, status)); if (UNLIKELY(length != bytes_read)) { + DCHECK_LT(bytes_read, length); *status = ReportIncompleteRead(length, bytes_read); return false; } @@ -114,9 +98,9 @@ inline bool ScannerContext::Stream::ReadBoolean(bool* b, Status* status) { return true; } -inline bool ScannerContext::Stream::ReadInt(int32_t* val, Status* status) { +inline bool ScannerContext::Stream::ReadInt(int32_t* val, Status* status, bool peek) { uint8_t* bytes; - RETURN_IF_FALSE(ReadBytes(sizeof(uint32_t), &bytes, status)); + RETURN_IF_FALSE(ReadBytes(sizeof(uint32_t), &bytes, status, peek)); *val = ReadWriteUtil::GetInt(bytes); return true; } @@ -174,6 +158,10 @@ inline bool ScannerContext::Stream::ReadZLong(int64_t* value, Status* status) { return true; } +inline bool ScannerContext::Stream::eof() const { + return file_offset() == file_desc_->file_length; +} + #undef RETURN_IF_FALSE #endif diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 4e4534ca4..2e143a453 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -493,6 +493,8 @@ void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) { void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) { DCHECK(desc != NULL); unique_lock lock(free_buffers_lock_); + DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc) + == free_buffer_descs_.end()); free_buffer_descs_.push_back(desc); } diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 666b8dccb..655523cd5 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -94,6 +94,7 @@ class RowBatch { int AddRow() { return AddRows(1); } void CommitRows(int n) { + DCHECK_GE(n, 0); DCHECK_LE(num_rows_ + n, capacity_); num_rows_ += n; has_in_flight_row_ = false; diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test index 1c62eb90d..a6301ff5c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test +++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs-tiny-scan.test @@ -17,11 +17,10 @@ string 'ccccc' 'eeeeeeee' ==== -# This fails due to IMP-636. Re-enable it once that bug gets fixed. -#---- QUERY -# select description1 from zipcode_incomes limit 10 -#---- TYPES -#string -#---- RESULTS -# -#==== +---- QUERY +select count(*) from alltypessmall +---- TYPES +bigint +---- RESULTS +100 +==== diff --git a/tests/query_test/test_scan_range_lengths.py b/tests/query_test/test_scan_range_lengths.py index 0267d0e5c..921b6eca4 100644 --- a/tests/query_test/test_scan_range_lengths.py +++ b/tests/query_test/test_scan_range_lengths.py @@ -25,9 +25,6 @@ class TestScanRangeLengths(ImpalaTestSuite): TestDimension('max_scan_range_length', *MAX_SCAN_RANGE_LENGTHS)) def test_scan_ranges(self, vector): - if vector.get_value('table_format').file_format != 'text': - pytest.xfail(reason='IMP-636') - vector.get_value('exec_option')['max_scan_range_length'] =\ vector.get_value('max_scan_range_length') self.run_test_case('QueryTest/hdfs-tiny-scan', vector)