diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index 1bc0a460d..293995af6 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -261,7 +261,6 @@ Status HdfsAvroScanner::InitNewRange() { DCHECK(header_ != NULL); only_parsing_header_ = false; avro_header_ = reinterpret_cast(header_); - template_tuple_ = context_->template_tuple(); if (header_->is_compressed) { RETURN_IF_ERROR(Codec::CreateDecompressor(state_, data_buffer_pool_.get(), stream_->compact_data(), @@ -343,7 +342,7 @@ Status HdfsAvroScanner::DecodeAvroData(int max_tuples, int64_t* num_records, for (int i = 0; i < n; ++i) { // Initialize tuple from the partition key template tuple before writing the // slots - InitTuple(template_tuple_, tuple); + InitTuple(context_->template_tuple(), tuple); // Decode record for (int j = 0; j < slot_descs_.size(); ++j) { diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 2a33a204c..1a08e64c9 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -133,6 +133,9 @@ class HdfsParquetScanner::ColumnReader { // Pointer to start of next value in data page uint8_t* data_; + // Decoder for bool values. Only valid if type is TYPE_BOOLEAN + BitReader bool_values_; + // Decoder for definition. Only one of these is valid at a time, depending on // the data page metadata. RleDecoder rle_def_levels_; @@ -178,6 +181,9 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() { // now complete, pass along the memory allocated for it. parent_->context_->AcquirePool(decompressed_data_pool_.get()); + // Read the next data page, skipping page types we don't care about. + // We break out of this loop on the non-error case (either eosr or + // a data page was found). while (true) { DCHECK_EQ(num_buffered_values_, 0); RETURN_IF_ERROR(stream_->GetRawBytes(&buffer, &num_bytes, &eos)); @@ -240,6 +246,7 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() { reinterpret_cast(decompressed_buffer)); if (!success) return Status("Corrupt data page"); data_ = decompressed_buffer; + data_size = current_page_header_.uncompressed_page_size; } else { // TODO: handle the other codecs. DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED); @@ -249,8 +256,9 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() { int32_t num_definition_bytes = 0; switch (current_page_header_.data_page_header.definition_level_encoding) { case parquet::Encoding::RLE: - num_definition_bytes = *reinterpret_cast(data_); - data_ += sizeof(int32_t); + if (!ReadWriteUtil::Read(&data_, &data_size, &num_definition_bytes, &status)) { + return status; + } rle_def_levels_ = RleDecoder(data_, num_definition_bytes); break; case parquet::Encoding::BIT_PACKED: @@ -266,10 +274,17 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() { } DCHECK_GT(num_definition_bytes, 0); data_ += num_definition_bytes; + data_size -= num_definition_bytes; + + if (desc_->type() == TYPE_BOOLEAN) { + // Initialize bool decoder + bool_values_ = BitReader(data_, data_size); + } + break; } - return status; + return Status::OK; } // TODO More codegen here as well. @@ -315,6 +330,15 @@ inline bool HdfsParquetScanner::ColumnReader::ReadValue(MemPool* pool, Tuple* tu void* slot = tuple->GetSlot(desc_->tuple_offset()); switch (desc_->type()) { + case TYPE_BOOLEAN: { + bool valid; + valid = bool_values_.GetBool(reinterpret_cast(slot)); + if (!valid) { + parent_->parse_status_ = Status("Invalid bool column"); + return false; + } + break; + } case TYPE_TINYINT: *reinterpret_cast(slot) = *reinterpret_cast(data_); data_ += 4; @@ -352,6 +376,11 @@ inline bool HdfsParquetScanner::ColumnReader::ReadValue(MemPool* pool, Tuple* tu data_ += sv->len; break; } + case TYPE_TIMESTAMP: + // timestamp type is a 12 byte value. + memcpy(slot, data_, 12); + data_ += 12; + break; default: DCHECK(false); } @@ -389,7 +418,7 @@ Status HdfsParquetScanner::AssembleRows() { int num_to_commit = 0; for (int i = 0; i < num_rows; ++i) { - InitTuple(template_tuple_, tuple); + InitTuple(context_->template_tuple(), tuple); for (int c = 0; c < column_readers_.size(); ++c) { if (!column_readers_[c]->ReadValue(pool, tuple)) { assemble_rows_timer_.Stop(); diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index 674e16345..34d347ec9 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -80,7 +80,6 @@ Status HdfsRCFileScanner::InitNewRange() { only_parsing_header_ = false; row_group_buffer_size_ = 0; - template_tuple_ = context_->template_tuple(); if (header_->is_compressed) { RETURN_IF_ERROR(Codec::CreateDecompressor(state_, @@ -482,7 +481,7 @@ Status HdfsRCFileScanner::ProcessRange() { // Initialize tuple from the partition key template tuple before writing the // slots - InitTuple(template_tuple_, tuple); + InitTuple(context_->template_tuple(), tuple); for (it = materialized_slots.begin(); it != materialized_slots.end(); ++it) { const SlotDescriptor* slot_desc = *it; diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index e332ca6ec..f62214589 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -182,6 +182,7 @@ class HdfsScanNode : public ScanNode { // Allocates and initialises template_tuple_ with any values from // the partition columns for the current scan range + // Returns NULL if there are no materialized partition keys. // TODO: cache the tuple template in the partition object. Tuple* InitTemplateTuple(RuntimeState* state, const std::vector& expr_values); diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc index ccd76c89d..70bb371d2 100644 --- a/be/src/exec/hdfs-scanner-ir.cc +++ b/be/src/exec/hdfs-scanner-ir.cc @@ -36,7 +36,7 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_ uint8_t* tuple_row_mem = reinterpret_cast(tuple_row); uint8_t* tuple_mem = reinterpret_cast(tuple_); Tuple* tuple = reinterpret_cast(tuple_mem); - Tuple* template_tuple = template_tuple_; + Tuple* template_tuple = context_->template_tuple(); uint8_t error[slots_per_tuple]; memset(error, 0, sizeof(error)); diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index a382f5423..2aea6aa09 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -57,7 +57,6 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state) tuple_byte_size_(scan_node->tuple_desc()->byte_size()), tuple_(NULL), num_errors_in_file_(0), - template_tuple_(NULL), has_noncompact_strings_(!scan_node->compact_data() && !scan_node->tuple_desc()->string_slots().empty()), num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()), @@ -114,7 +113,7 @@ int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) { } DCHECK_GT(num_tuples, 0); - if (template_tuple_ == NULL) { + if (context_->template_tuple() == NULL) { // No slots from partitions keys or slots. This is count(*). Just add the // number of rows to the batch. row_batch->AddRows(num_tuples); @@ -124,7 +123,7 @@ int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) { int row_idx = row_batch->AddRow(); TupleRow* current_row = row_batch->GetRow(row_idx); - current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_); + current_row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple()); if (!ExecNode::EvalConjuncts(conjuncts_, num_conjuncts_, current_row)) { return 0; } @@ -139,7 +138,7 @@ int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) { row_idx = row_batch->AddRow(); DCHECK(row_idx != RowBatch::INVALID_ROW_INDEX); TupleRow* current_row = row_batch->GetRow(row_idx); - current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_); + current_row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple()); row_batch->CommitLastRow(); } } @@ -160,15 +159,15 @@ int HdfsScanner::WriteEmptyTuples(ScannerContext* context, } if (num_tuples == 0) return 0; - if (template_tuple_ == NULL) { + if (context_->template_tuple() == NULL) { return num_tuples; } else { - row->SetTuple(scan_node_->tuple_idx(), template_tuple_); + row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple()); if (!ExecNode::EvalConjuncts(conjuncts_, num_conjuncts_, row)) return 0; row = context->next_row(row); for (int n = 1; n < num_tuples; ++n) { - row->SetTuple(scan_node_->tuple_idx(), template_tuple_); + row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple()); row = context->next_row(row); } } diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 5fd938a5c..1c023485f 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -156,15 +156,6 @@ class HdfsScanner { // Helper class for converting text to other types; boost::scoped_ptr text_converter_; - // A partially materialized tuple with only partition key slots set. - // The non-partition key slots are set to NULL. The template tuple - // must be copied into tuple_ before any of the other slots are - // materialized. - // Pointer is NULL if there are no partition key slots. - // This template tuple is computed once for each file and valid for - // the duration of that file. - Tuple* template_tuple_; - // True if the descriptor of the tuple the scanner is writing has // string slots and we are not compacting data. This is used to decide // how to treat buffer memory that contains slot data. diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 090cba324..d0db4ba94 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -72,8 +72,6 @@ Status HdfsSequenceScanner::InitNewRange() { num_buffered_records_in_compressed_block_ = 0; - template_tuple_ = context_->template_tuple(); - SeqFileHeader* seq_header = reinterpret_cast(header_); if (seq_header->is_compressed) { // For record-compressed data we always want to copy since they tend to be @@ -364,7 +362,7 @@ Status HdfsSequenceScanner::ProcessRange() { memset(errors, 0, sizeof(errors)); add_row = WriteCompleteTuple(pool, &field_locations_[0], tuple_, tuple_row_mem, - template_tuple_, &errors[0], &error_in_row); + context_->template_tuple(), &errors[0], &error_in_row); if (UNLIKELY(error_in_row)) { ReportTupleParseError(&field_locations_[0], errors, 0); diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 17b858fc3..9e056cc20 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -124,7 +124,6 @@ void HdfsTextScanner::ResetScanner() { boundary_row_.Clear(); delimited_text_parser_->ParserReset(); - template_tuple_ = context_->template_tuple(); partial_tuple_empty_ = true; byte_buffer_ptr_ = byte_buffer_end_ = NULL; diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h index 9cb85334c..309489c06 100644 --- a/be/src/exec/parquet-common.h +++ b/be/src/exec/parquet-common.h @@ -36,7 +36,7 @@ const parquet::Type::type IMPALA_TO_PARQUET_TYPES[] = { parquet::Type::INT64, parquet::Type::FLOAT, parquet::Type::DOUBLE, - parquet::Type::INT96, + parquet::Type::INT96, // Timestamp parquet::Type::BYTE_ARRAY, }; diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index c55c4de00..b73f3e005 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -355,7 +355,13 @@ class ScannerContext { HdfsPartitionDescriptor* partition_desc_; - // Template tuple for this scan range + // A partially materialized tuple with only partition key slots set. + // The non-partition key slots are set to NULL. The template tuple + // must be copied into tuple_ before any of the other slots are + // materialized. + // Pointer is NULL if there are no partition key slots. + // This template tuple is computed once for each file and valid for + // the duration of that file. Tuple* template_tuple_; // Current row batch that tuples are being written to. diff --git a/be/src/util/compress.cc b/be/src/util/compress.cc index 599032b77..339af6c14 100644 --- a/be/src/util/compress.cc +++ b/be/src/util/compress.cc @@ -92,6 +92,7 @@ Status GzipCompressor::ProcessBlock(int input_length, uint8_t* input, DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool"; buffer_length_ = len; out_buffer_ = memory_pool_->Allocate(buffer_length_); + *output_length = buffer_length_; } } @@ -234,6 +235,7 @@ Status SnappyCompressor::ProcessBlock(int input_length, uint8_t* input, buffer_length_ = max_compressed_len; out_buffer_ = memory_pool_->Allocate(buffer_length_); *output = out_buffer_; + *output_length = max_compressed_len; } return Compress(input_length, input, output_length, out_buffer_);