Fix bugs in parquet scanner.

This commit is contained in:
Nong Li
2013-04-11 12:03:41 -07:00
committed by Henry Robinson
parent 3093006ca9
commit 0dcfbfafed
12 changed files with 54 additions and 31 deletions

View File

@@ -261,7 +261,6 @@ Status HdfsAvroScanner::InitNewRange() {
DCHECK(header_ != NULL);
only_parsing_header_ = false;
avro_header_ = reinterpret_cast<AvroFileHeader*>(header_);
template_tuple_ = context_->template_tuple();
if (header_->is_compressed) {
RETURN_IF_ERROR(Codec::CreateDecompressor(state_,
data_buffer_pool_.get(), stream_->compact_data(),
@@ -343,7 +342,7 @@ Status HdfsAvroScanner::DecodeAvroData(int max_tuples, int64_t* num_records,
for (int i = 0; i < n; ++i) {
// Initialize tuple from the partition key template tuple before writing the
// slots
InitTuple(template_tuple_, tuple);
InitTuple(context_->template_tuple(), tuple);
// Decode record
for (int j = 0; j < slot_descs_.size(); ++j) {

View File

@@ -133,6 +133,9 @@ class HdfsParquetScanner::ColumnReader {
// Pointer to start of next value in data page
uint8_t* data_;
// Decoder for bool values. Only valid if type is TYPE_BOOLEAN
BitReader bool_values_;
// Decoder for definition. Only one of these is valid at a time, depending on
// the data page metadata.
RleDecoder rle_def_levels_;
@@ -178,6 +181,9 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() {
// now complete, pass along the memory allocated for it.
parent_->context_->AcquirePool(decompressed_data_pool_.get());
// Read the next data page, skipping page types we don't care about.
// We break out of this loop on the non-error case (either eosr or
// a data page was found).
while (true) {
DCHECK_EQ(num_buffered_values_, 0);
RETURN_IF_ERROR(stream_->GetRawBytes(&buffer, &num_bytes, &eos));
@@ -240,6 +246,7 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() {
reinterpret_cast<char*>(decompressed_buffer));
if (!success) return Status("Corrupt data page");
data_ = decompressed_buffer;
data_size = current_page_header_.uncompressed_page_size;
} else {
// TODO: handle the other codecs.
DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
@@ -249,8 +256,9 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() {
int32_t num_definition_bytes = 0;
switch (current_page_header_.data_page_header.definition_level_encoding) {
case parquet::Encoding::RLE:
num_definition_bytes = *reinterpret_cast<int32_t*>(data_);
data_ += sizeof(int32_t);
if (!ReadWriteUtil::Read(&data_, &data_size, &num_definition_bytes, &status)) {
return status;
}
rle_def_levels_ = RleDecoder(data_, num_definition_bytes);
break;
case parquet::Encoding::BIT_PACKED:
@@ -266,10 +274,17 @@ Status HdfsParquetScanner::ColumnReader::ReadDataPage() {
}
DCHECK_GT(num_definition_bytes, 0);
data_ += num_definition_bytes;
data_size -= num_definition_bytes;
if (desc_->type() == TYPE_BOOLEAN) {
// Initialize bool decoder
bool_values_ = BitReader(data_, data_size);
}
break;
}
return status;
return Status::OK;
}
// TODO More codegen here as well.
@@ -315,6 +330,15 @@ inline bool HdfsParquetScanner::ColumnReader::ReadValue(MemPool* pool, Tuple* tu
void* slot = tuple->GetSlot(desc_->tuple_offset());
switch (desc_->type()) {
case TYPE_BOOLEAN: {
bool valid;
valid = bool_values_.GetBool(reinterpret_cast<bool*>(slot));
if (!valid) {
parent_->parse_status_ = Status("Invalid bool column");
return false;
}
break;
}
case TYPE_TINYINT:
*reinterpret_cast<int8_t*>(slot) = *reinterpret_cast<int32_t*>(data_);
data_ += 4;
@@ -352,6 +376,11 @@ inline bool HdfsParquetScanner::ColumnReader::ReadValue(MemPool* pool, Tuple* tu
data_ += sv->len;
break;
}
case TYPE_TIMESTAMP:
// timestamp type is a 12 byte value.
memcpy(slot, data_, 12);
data_ += 12;
break;
default:
DCHECK(false);
}
@@ -389,7 +418,7 @@ Status HdfsParquetScanner::AssembleRows() {
int num_to_commit = 0;
for (int i = 0; i < num_rows; ++i) {
InitTuple(template_tuple_, tuple);
InitTuple(context_->template_tuple(), tuple);
for (int c = 0; c < column_readers_.size(); ++c) {
if (!column_readers_[c]->ReadValue(pool, tuple)) {
assemble_rows_timer_.Stop();

View File

@@ -80,7 +80,6 @@ Status HdfsRCFileScanner::InitNewRange() {
only_parsing_header_ = false;
row_group_buffer_size_ = 0;
template_tuple_ = context_->template_tuple();
if (header_->is_compressed) {
RETURN_IF_ERROR(Codec::CreateDecompressor(state_,
@@ -482,7 +481,7 @@ Status HdfsRCFileScanner::ProcessRange() {
// Initialize tuple from the partition key template tuple before writing the
// slots
InitTuple(template_tuple_, tuple);
InitTuple(context_->template_tuple(), tuple);
for (it = materialized_slots.begin(); it != materialized_slots.end(); ++it) {
const SlotDescriptor* slot_desc = *it;

View File

@@ -182,6 +182,7 @@ class HdfsScanNode : public ScanNode {
// Allocates and initialises template_tuple_ with any values from
// the partition columns for the current scan range
// Returns NULL if there are no materialized partition keys.
// TODO: cache the tuple template in the partition object.
Tuple* InitTemplateTuple(RuntimeState* state, const std::vector<Expr*>& expr_values);

View File

@@ -36,7 +36,7 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_
uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row);
uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_);
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem);
Tuple* template_tuple = template_tuple_;
Tuple* template_tuple = context_->template_tuple();
uint8_t error[slots_per_tuple];
memset(error, 0, sizeof(error));

View File

@@ -57,7 +57,6 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state)
tuple_byte_size_(scan_node->tuple_desc()->byte_size()),
tuple_(NULL),
num_errors_in_file_(0),
template_tuple_(NULL),
has_noncompact_strings_(!scan_node->compact_data() &&
!scan_node->tuple_desc()->string_slots().empty()),
num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()),
@@ -114,7 +113,7 @@ int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) {
}
DCHECK_GT(num_tuples, 0);
if (template_tuple_ == NULL) {
if (context_->template_tuple() == NULL) {
// No slots from partitions keys or slots. This is count(*). Just add the
// number of rows to the batch.
row_batch->AddRows(num_tuples);
@@ -124,7 +123,7 @@ int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) {
int row_idx = row_batch->AddRow();
TupleRow* current_row = row_batch->GetRow(row_idx);
current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
current_row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple());
if (!ExecNode::EvalConjuncts(conjuncts_, num_conjuncts_, current_row)) {
return 0;
}
@@ -139,7 +138,7 @@ int HdfsScanner::WriteEmptyTuples(RowBatch* row_batch, int num_tuples) {
row_idx = row_batch->AddRow();
DCHECK(row_idx != RowBatch::INVALID_ROW_INDEX);
TupleRow* current_row = row_batch->GetRow(row_idx);
current_row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
current_row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple());
row_batch->CommitLastRow();
}
}
@@ -160,15 +159,15 @@ int HdfsScanner::WriteEmptyTuples(ScannerContext* context,
}
if (num_tuples == 0) return 0;
if (template_tuple_ == NULL) {
if (context_->template_tuple() == NULL) {
return num_tuples;
} else {
row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple());
if (!ExecNode::EvalConjuncts(conjuncts_, num_conjuncts_, row)) return 0;
row = context->next_row(row);
for (int n = 1; n < num_tuples; ++n) {
row->SetTuple(scan_node_->tuple_idx(), template_tuple_);
row->SetTuple(scan_node_->tuple_idx(), context_->template_tuple());
row = context->next_row(row);
}
}

View File

@@ -156,15 +156,6 @@ class HdfsScanner {
// Helper class for converting text to other types;
boost::scoped_ptr<TextConverter> text_converter_;
// A partially materialized tuple with only partition key slots set.
// The non-partition key slots are set to NULL. The template tuple
// must be copied into tuple_ before any of the other slots are
// materialized.
// Pointer is NULL if there are no partition key slots.
// This template tuple is computed once for each file and valid for
// the duration of that file.
Tuple* template_tuple_;
// True if the descriptor of the tuple the scanner is writing has
// string slots and we are not compacting data. This is used to decide
// how to treat buffer memory that contains slot data.

View File

@@ -72,8 +72,6 @@ Status HdfsSequenceScanner::InitNewRange() {
num_buffered_records_in_compressed_block_ = 0;
template_tuple_ = context_->template_tuple();
SeqFileHeader* seq_header = reinterpret_cast<SeqFileHeader*>(header_);
if (seq_header->is_compressed) {
// For record-compressed data we always want to copy since they tend to be
@@ -364,7 +362,7 @@ Status HdfsSequenceScanner::ProcessRange() {
memset(errors, 0, sizeof(errors));
add_row = WriteCompleteTuple(pool, &field_locations_[0], tuple_, tuple_row_mem,
template_tuple_, &errors[0], &error_in_row);
context_->template_tuple(), &errors[0], &error_in_row);
if (UNLIKELY(error_in_row)) {
ReportTupleParseError(&field_locations_[0], errors, 0);

View File

@@ -124,7 +124,6 @@ void HdfsTextScanner::ResetScanner() {
boundary_row_.Clear();
delimited_text_parser_->ParserReset();
template_tuple_ = context_->template_tuple();
partial_tuple_empty_ = true;
byte_buffer_ptr_ = byte_buffer_end_ = NULL;

View File

@@ -36,7 +36,7 @@ const parquet::Type::type IMPALA_TO_PARQUET_TYPES[] = {
parquet::Type::INT64,
parquet::Type::FLOAT,
parquet::Type::DOUBLE,
parquet::Type::INT96,
parquet::Type::INT96, // Timestamp
parquet::Type::BYTE_ARRAY,
};

View File

@@ -355,7 +355,13 @@ class ScannerContext {
HdfsPartitionDescriptor* partition_desc_;
// Template tuple for this scan range
// A partially materialized tuple with only partition key slots set.
// The non-partition key slots are set to NULL. The template tuple
// must be copied into tuple_ before any of the other slots are
// materialized.
// Pointer is NULL if there are no partition key slots.
// This template tuple is computed once for each file and valid for
// the duration of that file.
Tuple* template_tuple_;
// Current row batch that tuples are being written to.

View File

@@ -92,6 +92,7 @@ Status GzipCompressor::ProcessBlock(int input_length, uint8_t* input,
DCHECK(memory_pool_ != NULL) << "Can't allocate without passing in a mem pool";
buffer_length_ = len;
out_buffer_ = memory_pool_->Allocate(buffer_length_);
*output_length = buffer_length_;
}
}
@@ -234,6 +235,7 @@ Status SnappyCompressor::ProcessBlock(int input_length, uint8_t* input,
buffer_length_ = max_compressed_len;
out_buffer_ = memory_pool_->Allocate(buffer_length_);
*output = out_buffer_;
*output_length = max_compressed_len;
}
return Compress(input_length, input, output_length, out_buffer_);