IMPALA-6503: Support reading complex types from ORC

We've supported reading primitive types from ORC files (IMPALA-5717).
In this patch we add support for complex types (struct/array/map).

In IMPALA-5717, we leverage the ORC lib to parse ORC binaries (data in
io buffer read from DiskIoMgr). The ORC lib can materialize ORC column
binaries into its representation (orc::ColumnVectorBatch). Then we
transform values in orc::ColumnVectorBatch into impala::Tuples in
hdfs-orc-scanner. We don't need to do anything about decoding/decompression
since they are handled by the ORC lib. Fortunately, the ORC lib already
supports complex types, we can still leverage it to support complex types.

What we need to add in IMPALA-6503 are two things:
1. Specify which nested columns we need in the form required by the ORC
  lib (Get list of ORC type ids from tuple descriptors)
2. Transform outputs of ORC lib (nested orc::ColumnVectorBatch) into
  Impala's representation (Slots/Tuples/RowBatches)

To format the materialization, we implement several ORC column readers
in hdfs-orc-scanner. Each kind of reader treats a column type and
transforms outputs of the ORC lib into tuple/slot values.

Tests:
* Enable existing tests for complex types (test_nested_types.py,
test_tpch_nested_queries.py) for ORC.
* Run exhaustive tests in DEBUG and RELEASE builds.

Change-Id: I244dc9d2b3e425393f90e45632cb8cdbea6cf790
Reviewed-on: http://gerrit.cloudera.org:8080/12168
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
stiga-huang
2018-12-01 22:56:42 +08:00
committed by Impala Public Jenkins
parent 214f61a180
commit 9686545bfd
30 changed files with 1943 additions and 466 deletions

View File

@@ -70,6 +70,8 @@ add_library(Exec
nested-loop-join-node.cc
non-grouping-aggregator.cc
non-grouping-aggregator-ir.cc
orc-column-readers.cc
orc-metadata-utils.cc
partial-sort-node.cc
partitioned-hash-join-builder.cc
partitioned-hash-join-builder-ir.cc

View File

@@ -19,8 +19,10 @@
#include <queue>
#include "exec/orc-column-readers.h"
#include "exec/scanner-context.inline.h"
#include "exprs/expr.h"
#include "runtime/collection-value-builder.h"
#include "runtime/exec-env.h"
#include "runtime/io/request-context.h"
#include "runtime/runtime-filter.inline.h"
@@ -170,8 +172,32 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
context_->ReleaseCompletedResources(true);
RETURN_IF_ERROR(footer_status);
// Update orc reader options base on the tuple descriptor
RETURN_IF_ERROR(SelectColumns(scan_node_->tuple_desc()));
schema_resolver_.reset(new OrcSchemaResolver(*scan_node_->hdfs_table(),
&reader_->getType(), filename()));
// Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
// columns we don't need.
RETURN_IF_ERROR(SelectColumns(*scan_node_->tuple_desc()));
// Build 'col_id_path_map_' that maps from ORC column ids to their corresponding
// SchemaPath in the table. The map is used in the constructors of OrcColumnReaders
// where we resolve SchemaPaths of the descriptors.
OrcMetadataUtils::BuildSchemaPaths(reader_->getType(),
scan_node_->num_partition_keys(), &col_id_path_map_);
// To create OrcColumnReaders, we need the selected orc schema. It's a subset of the
// file schema: a tree of selected orc types and can only be got from an orc::RowReader
// (by orc::RowReader::getSelectedType).
// Selected nodes are still connected as a tree since if a node is selected, all its
// ancestors and children will be selected too.
// Here we haven't read stripe data yet so no orc::RowReaders are created. To get the
// selected types we create a temp orc::RowReader (but won't read rows from it).
unique_ptr<orc::RowReader> tmp_row_reader =
reader_->createRowReader(row_reader_options_);
const orc::Type* root_type = &tmp_row_reader->getSelectedType();
DCHECK_EQ(root_type->getKind(), orc::TypeKind::STRUCT);
orc_root_reader_ = this->obj_pool_.Add(
new OrcStructReader(root_type, scan_node_->tuple_desc(), this));
// Set top-level template tuple.
template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
@@ -191,7 +217,7 @@ void HdfsOrcScanner::Close(RowBatch* row_batch) {
template_tuple_pool_->FreeAll();
context_->ReleaseCompletedResources(true);
}
scratch_batch_.reset(nullptr);
orc_root_batch_.reset(nullptr);
// Verify all resources (if any) have been transferred.
DCHECK_EQ(template_tuple_pool_->total_allocated_bytes(), 0);
@@ -254,105 +280,141 @@ inline THdfsCompression::type HdfsOrcScanner::TranslateCompressionKind(
return THdfsCompression::DEFAULT;
}
Status HdfsOrcScanner::SelectColumns(const TupleDescriptor* tuple_desc) {
list<uint64_t> selected_indices;
int num_columns = 0;
const orc::Type& root_type = reader_->getType();
// TODO validate columns. e.g. scale of decimal type
for (SlotDescriptor* slot_desc: tuple_desc->slots()) {
// Skip partition columns
if (slot_desc->col_pos() < scan_node_->num_partition_keys()) continue;
bool HdfsOrcScanner::IsPartitionKeySlot(const SlotDescriptor* slot) {
return slot->parent() == scan_node_->tuple_desc() &&
slot->col_pos() < scan_node_->num_partition_keys();
}
const SchemaPath &path = slot_desc->col_path();
DCHECK_EQ(path.size(), 1);
int col_idx = path[0];
// The first index in a path includes the table's partition keys
int col_idx_in_file = col_idx - scan_node_->num_partition_keys();
if (col_idx_in_file >= root_type.getSubtypeCount()) {
// In this case, we are selecting a column that is not in the file.
bool HdfsOrcScanner::IsMissingField(const SlotDescriptor* slot) {
return missing_field_slots_.find(slot) != missing_field_slots_.end();
}
Status HdfsOrcScanner::ResolveColumns(const TupleDescriptor& tuple_desc,
list<const orc::Type*>* selected_nodes, stack<const SlotDescriptor*>* pos_slots) {
const orc::Type* node = nullptr;
bool pos_field = false;
bool missing_field = false;
RETURN_IF_ERROR(schema_resolver_->ResolveColumn(tuple_desc.tuple_path(), &node,
&pos_field, &missing_field));
if (missing_field) {
return Status(Substitute("Could not find nested column '$0' in file '$1'.",
PrintPath(*scan_node_->hdfs_table(), tuple_desc.tuple_path()), filename()));
}
if (tuple_desc.byte_size() == 0) {
// Don't need to materialize any slots but just generate an empty tuple for each
// (collection) row. (E.g. count(*) or 'exists' on results of subquery).
// Due to ORC-450 we can't get the number of tuples inside a collection without
// reading its items (or subcolumn of its items). So we select the most inner
// subcolumn of the collection (get by orc::Type::getMaximumColumnId()). E.g.
// if 'node' is array<struct<c1:int,c2:int,c3:int>> and we just need the array
// lengths. We still need to read at least one subcolumn otherwise the ORC lib
// will skip the whole array column. So we select 'c3' for this case.
selected_type_ids_.push_back(node->getMaximumColumnId());
VLOG(3) << "Add ORC column " << node->getMaximumColumnId() << " for empty tuple "
<< PrintPath(*scan_node_->hdfs_table(), tuple_desc.tuple_path());
return Status::OK();
}
// Each tuple can have at most one position slot.
SlotDescriptor* pos_slot_desc = nullptr;
for (SlotDescriptor* slot_desc : tuple_desc.slots()) {
// Skip partition columns
if (IsPartitionKeySlot(slot_desc)) continue;
node = nullptr;
pos_field = false;
missing_field = false;
// Reminder: slot_desc->col_path() can be much deeper than tuple_desc.tuple_path()
// to reference to a deep subcolumn.
RETURN_IF_ERROR(schema_resolver_->ResolveColumn(
slot_desc->col_path(), &node, &pos_field, &missing_field));
if (missing_field) {
if (slot_desc->type().IsCollectionType()) {
// If the collection column is missing, the whole scan range should return 0 rows
// since we're selecting children column(s) of the collection.
return Status(Substitute("Could not find nested column '$0' in file '$1'.",
PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path()), filename()));
}
// In this case, we are selecting a column/subcolumn that is not in the file.
// Update the template tuple to put a NULL in this slot.
Tuple** template_tuple = &template_tuple_map_[tuple_desc];
Tuple** template_tuple = &template_tuple_map_[&tuple_desc];
if (*template_tuple == nullptr) {
*template_tuple =
Tuple::Create(tuple_desc->byte_size(), template_tuple_pool_.get());
Tuple::Create(tuple_desc.byte_size(), template_tuple_pool_.get());
}
(*template_tuple)->SetNull(slot_desc->null_indicator_offset());
missing_field_slots_.insert(slot_desc);
continue;
}
selected_indices.push_back(col_idx_in_file);
const orc::Type* orc_type = root_type.getSubtype(col_idx_in_file);
const ColumnType& col_type = scan_node_->hdfs_table()->col_descs()[col_idx].type();
// TODO(IMPALA-6503): Support reading complex types from ORC format files
DCHECK(!col_type.IsComplexType()) << "Complex types are not supported yet";
RETURN_IF_ERROR(ValidateType(col_type, *orc_type));
col_id_slot_map_[orc_type->getColumnId()] = slot_desc;
++num_columns;
if (pos_field) {
DCHECK(pos_slot_desc == nullptr)
<< "There should only be one position slot per tuple";
pos_slot_desc = slot_desc;
pos_slots->push(pos_slot_desc);
DCHECK_EQ(node->getKind(), orc::TypeKind::LIST);
continue;
}
// 'col_path'(SchemaPath) of the SlotDescriptor won't map to a STRUCT column.
// We only deal with collection columns (ARRAY/MAP) and primitive columns here.
if (slot_desc->type().IsCollectionType()) {
// Recursively resolve nested columns
DCHECK(slot_desc->collection_item_descriptor() != nullptr);
const TupleDescriptor* item_tuple_desc = slot_desc->collection_item_descriptor();
RETURN_IF_ERROR(ResolveColumns(*item_tuple_desc, selected_nodes, pos_slots));
} else {
VLOG(3) << "Add ORC column " << node->getColumnId() << " for "
<< PrintPath(*scan_node_->hdfs_table(), slot_desc->col_path());
selected_nodes->push_back(node);
}
}
COUNTER_SET(num_cols_counter_, static_cast<int64_t>(num_columns));
row_reader_options.include(selected_indices);
return Status::OK();
}
Status HdfsOrcScanner::ValidateType(const ColumnType& type, const orc::Type& orc_type) {
switch (orc_type.getKind()) {
case orc::TypeKind::BOOLEAN:
if (type.type == TYPE_BOOLEAN) return Status::OK();
break;
case orc::TypeKind::BYTE:
if (type.type == TYPE_TINYINT || type.type == TYPE_SMALLINT
|| type.type == TYPE_INT || type.type == TYPE_BIGINT)
return Status::OK();
break;
case orc::TypeKind::SHORT:
if (type.type == TYPE_SMALLINT || type.type == TYPE_INT
|| type.type == TYPE_BIGINT)
return Status::OK();
break;
case orc::TypeKind::INT:
if (type.type == TYPE_INT || type.type == TYPE_BIGINT) return Status::OK();
break;
case orc::TypeKind::LONG:
if (type.type == TYPE_BIGINT) return Status::OK();
break;
case orc::TypeKind::FLOAT:
case orc::TypeKind::DOUBLE:
if (type.type == TYPE_FLOAT || type.type == TYPE_DOUBLE) return Status::OK();
break;
case orc::TypeKind::STRING:
case orc::TypeKind::VARCHAR:
case orc::TypeKind::CHAR:
if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR
|| type.type == TYPE_CHAR)
return Status::OK();
break;
case orc::TypeKind::TIMESTAMP:
if (type.type == TYPE_TIMESTAMP) return Status::OK();
break;
case orc::TypeKind::DECIMAL: {
if (type.type != TYPE_DECIMAL || type.scale != orc_type.getScale()) break;
bool overflow = false;
int orc_precision = orc_type.getPrecision();
if (orc_precision == 0 || orc_precision > ColumnType::MAX_DECIMAL8_PRECISION) {
// For ORC decimals whose precision is larger than 18, its value can't fit into
// an int64 (10^19 > 2^63). So we should use int128 (16 bytes) for this case.
// The possible byte sizes for Impala decimals are 4, 8, 16.
// We mark it as overflow if the target byte size is not 16.
overflow = (type.GetByteSize() != 16);
} else if (orc_type.getPrecision() > ColumnType::MAX_DECIMAL4_PRECISION) {
// For ORC decimals whose precision <= 18 and > 9, int64 and int128 can fit them.
// We only mark it as overflow if the target byte size is 4.
overflow = (type.GetByteSize() == 4);
}
if (!overflow) return Status::OK();
return Status(Substitute(
"It can't be truncated to table column $2 for column $0 in ORC file '$1'",
orc_type.toString(), filename(), type.DebugString()));
}
default: break;
/// Whether 'selected_type_ids' contains the id of any children of 'node'
bool HasChildrenSelected(const orc::Type& node,
const list<uint64_t>& selected_type_ids) {
for (uint64_t id : selected_type_ids) {
if (id >= node.getColumnId() && id <= node.getMaximumColumnId()) return true;
}
return Status(Substitute(
"Type mismatch: table column $0 is map to column $1 in ORC file '$2'",
type.DebugString(), orc_type.toString(), filename()));
return false;
}
Status HdfsOrcScanner::SelectColumns(const TupleDescriptor& tuple_desc) {
list<const orc::Type*> selected_nodes;
stack<const SlotDescriptor*> pos_slots;
// Select columns for all non-position slots.
RETURN_IF_ERROR(ResolveColumns(tuple_desc, &selected_nodes, &pos_slots));
for (auto t : selected_nodes) selected_type_ids_.push_back(t->getColumnId());
// Select columns for array positions. Due to ORC-450 we can't materialize array
// offsets without materializing its items, so we should still select the item or any
// sub column of the item. To be simple, we choose the max column id in the subtree
// of the ARRAY node.
// We process the deeper position slots first since it may introduce an item column
// that can also serve the position slot of upper arrays. E.g. for 'array_col' as
// array<struct<c1:int,c2:int,c3:array<int>>>, if both 'array_col.pos' and
// 'array_col.item.c3.pos' are needed, we just need to select 'array_col.item.c3.item'
// in the ORC lib, then we get offsets(indices) of both the inner and outer arrays.
while (!pos_slots.empty()) {
const SlotDescriptor* pos_slot_desc = pos_slots.top();
pos_slots.pop();
const orc::Type* array_node = nullptr;
bool pos_field = false;
bool missing_field = false;
RETURN_IF_ERROR(schema_resolver_->ResolveColumn(pos_slot_desc->col_path(),
&array_node, &pos_field, &missing_field));
if (HasChildrenSelected(*array_node, selected_type_ids_)) continue;
selected_type_ids_.push_back(array_node->getMaximumColumnId());
VLOG(3) << "Add ORC column " << array_node->getMaximumColumnId() << " for "
<< PrintPath(*scan_node_->hdfs_table(), pos_slot_desc->col_path());
selected_nodes.push_back(array_node);
}
COUNTER_SET(num_cols_counter_, static_cast<int64_t>(selected_type_ids_.size()));
row_reader_options_.includeTypes(selected_type_ids_);
return Status::OK();
}
Status HdfsOrcScanner::ProcessSplit() {
@@ -401,15 +463,23 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
tuple_mem_ = nullptr;
tuple_ = nullptr;
// Transfer remaining tuples from the scratch batch.
if (ScratchBatchNotEmpty()) {
// Transfer remaining values in current orc batch. They are left in the previous call
// of 'TransferTuples' inside 'AssembleRows'. Since the orc batch has the same capacity
// as RowBatch's, the remaining values should be drained by one more round of calling
// 'TransferTuples' here.
if (!orc_root_reader_->EndOfBatch()) {
assemble_rows_timer_.Start();
RETURN_IF_ERROR(TransferScratchTuples(row_batch));
RETURN_IF_ERROR(TransferTuples(orc_root_reader_, row_batch));
assemble_rows_timer_.Stop();
if (row_batch->AtCapacity()) return Status::OK();
DCHECK_EQ(scratch_batch_tuple_idx_, scratch_batch_->numElements);
DCHECK(orc_root_reader_->EndOfBatch());
}
// Process next stripe if current stripe is drained. Each stripe will generate several
// orc batches. We only advance the stripe after processing the last batch.
// 'advance_stripe_' is updated in 'NextStripe', meaning the current stripe we advance
// to can be skip. 'end_of_stripe_' marks whether current stripe is drained. It's only
// set to true in 'AssembleRows'.
while (advance_stripe_ || end_of_stripe_) {
context_->ReleaseCompletedResources(/* done */ true);
// Commit the rows to flush the row batch from the previous stripe
@@ -444,11 +514,6 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
return Status::OK();
}
inline bool HdfsOrcScanner::ScratchBatchNotEmpty() {
return scratch_batch_ != nullptr
&& scratch_batch_tuple_idx_ < scratch_batch_->numElements;
}
inline static bool CheckStripeOverlapsSplit(int64_t stripe_start, int64_t stripe_end,
int64_t split_start, int64_t split_end) {
return (split_start >= stripe_start && split_start < stripe_end) ||
@@ -505,9 +570,9 @@ Status HdfsOrcScanner::NextStripe() {
// TODO: check if this stripe can be skipped by stats. e.g. IMPALA-6505
COUNTER_ADD(num_stripes_counter_, 1);
row_reader_options.range(stripe->getOffset(), stripe_len);
row_reader_options_.range(stripe->getOffset(), stripe_len);
try {
row_reader_ = reader_->createRowReader(row_reader_options);
row_reader_ = reader_->createRowReader(row_reader_options_);
} catch (ResourceError& e) { // errors throw from the orc scanner
parse_status_ = e.GetStatus();
return parse_status_;
@@ -531,18 +596,19 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
bool continue_execution = !scan_node_->ReachedLimit() && !context_->cancelled();
if (!continue_execution) return Status::CancelledInternal("ORC scanner");
scratch_batch_tuple_idx_ = 0;
scratch_batch_ = row_reader_->createRowBatch(row_batch->capacity());
DCHECK_EQ(scratch_batch_->numElements, 0);
// We're going to free the previous batch. Clear the reference first.
orc_root_reader_->UpdateInputBatch(nullptr);
orc_root_batch_ = row_reader_->createRowBatch(row_batch->capacity());
DCHECK_EQ(orc_root_batch_->numElements, 0);
int64_t num_rows_read = 0;
while (continue_execution) { // one ORC scratch batch (ColumnVectorBatch) in a round
if (scratch_batch_tuple_idx_ == scratch_batch_->numElements) {
while (continue_execution) { // one ORC batch (ColumnVectorBatch) in a round
if (orc_root_reader_->EndOfBatch()) {
try {
if (!row_reader_->next(*scratch_batch_)) {
end_of_stripe_ = true;
break; // no more data to process
}
end_of_stripe_ |= !row_reader_->next(*orc_root_batch_);
orc_root_reader_->UpdateInputBatch(orc_root_batch_.get());
if (end_of_stripe_) break; // no more data to process
} catch (ResourceError& e) {
parse_status_ = e.GetStatus();
return parse_status_;
@@ -552,33 +618,42 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
eos_ = true;
return parse_status_;
}
if (scratch_batch_->numElements == 0) {
if (orc_root_batch_->numElements == 0) {
RETURN_IF_ERROR(CommitRows(0, row_batch));
end_of_stripe_ = true;
return Status::OK();
}
num_rows_read += scratch_batch_->numElements;
scratch_batch_tuple_idx_ = 0;
num_rows_read += orc_root_batch_->numElements;
}
RETURN_IF_ERROR(TransferScratchTuples(row_batch));
RETURN_IF_ERROR(TransferTuples(orc_root_reader_, row_batch));
if (row_batch->AtCapacity()) break;
continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled();
}
stripe_rows_read_ += num_rows_read;
COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
// Merge Scanner-local counter into HdfsScanNode counter and reset.
COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_);
coll_items_read_counter_ = 0;
return Status::OK();
}
Status HdfsOrcScanner::TransferScratchTuples(RowBatch* dst_batch) {
Status HdfsOrcScanner::TransferTuples(OrcComplexColumnReader* coll_reader,
RowBatch* dst_batch) {
if (!coll_reader->MaterializeTuple()) {
// Top-level readers that are not materializing tuples will delegate the
// materialization to its unique child.
DCHECK_EQ(coll_reader->children().size(), 1);
OrcColumnReader* child = coll_reader->children()[0];
// Only complex type readers can be top-level readers.
DCHECK(child->IsComplexColumnReader());
return TransferTuples(static_cast<OrcComplexColumnReader*>(child), dst_batch);
}
const TupleDescriptor* tuple_desc = scan_node_->tuple_desc();
ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_->data();
int num_conjuncts = conjunct_evals_->size();
const orc::Type* root_type = &row_reader_->getSelectedType();
DCHECK_EQ(root_type->getKind(), orc::TypeKind::STRUCT);
DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
if (tuple_ == nullptr) RETURN_IF_ERROR(AllocateTupleMem(dst_batch));
int row_id = dst_batch->num_rows();
@@ -588,13 +663,10 @@ Status HdfsOrcScanner::TransferScratchTuples(RowBatch* dst_batch) {
Tuple* tuple = tuple_; // tuple_ is updated in CommitRows
// TODO(IMPALA-6506): codegen the runtime filter + conjunct evaluation loop
// TODO: transfer the scratch_batch_ column-by-column for batch, and then evaluate
// the predicates in later loop.
while (row_id < capacity && ScratchBatchNotEmpty()) {
DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
while (row_id < capacity && !coll_reader->EndOfBatch()) {
if (tuple_desc->byte_size() > 0) DCHECK_LT((void*)tuple, (void*)tuple_mem_end_);
InitTuple(tuple_desc, template_tuple_, tuple);
RETURN_IF_ERROR(ReadRow(static_cast<const orc::StructVectorBatch&>(*scratch_batch_),
scratch_batch_tuple_idx_++, root_type, tuple, dst_batch));
RETURN_IF_ERROR(coll_reader->TransferTuple(tuple, dst_batch->tuple_data_pool()));
row->SetTuple(scan_node_->tuple_idx(), tuple);
if (!EvalRuntimeFilters(row)) continue;
if (ExecNode::EvalConjuncts(conjunct_evals, num_conjuncts, row)) {
@@ -619,150 +691,64 @@ Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
return Status::OK();
}
inline Status HdfsOrcScanner::ReadRow(const orc::StructVectorBatch& batch, int row_idx,
const orc::Type* orc_type, Tuple* tuple, RowBatch* dst_batch) {
for (unsigned int c = 0; c < orc_type->getSubtypeCount(); ++c) {
orc::ColumnVectorBatch* col_batch = batch.fields[c];
const orc::Type* col_type = orc_type->getSubtype(c);
const SlotDescriptor* slot_desc = DCHECK_NOTNULL(
col_id_slot_map_[col_type->getColumnId()]);
if (col_batch->hasNulls && !col_batch->notNull[row_idx]) {
tuple->SetNull(slot_desc->null_indicator_offset());
continue;
}
void* slot_val_ptr = tuple->GetSlot(slot_desc->tuple_offset());
switch (col_type->getKind()) {
case orc::TypeKind::BOOLEAN: {
int64_t val = static_cast<const orc::LongVectorBatch*>(col_batch)->
data.data()[row_idx];
*(reinterpret_cast<bool*>(slot_val_ptr)) = (val != 0);
break;
}
case orc::TypeKind::BYTE:
case orc::TypeKind::SHORT:
case orc::TypeKind::INT:
case orc::TypeKind::LONG: {
const orc::LongVectorBatch* long_batch =
static_cast<const orc::LongVectorBatch*>(col_batch);
int64_t val = long_batch->data.data()[row_idx];
switch (slot_desc->type().type) {
case TYPE_TINYINT:
*(reinterpret_cast<int8_t*>(slot_val_ptr)) = val;
break;
case TYPE_SMALLINT:
*(reinterpret_cast<int16_t*>(slot_val_ptr)) = val;
break;
case TYPE_INT:
*(reinterpret_cast<int32_t*>(slot_val_ptr)) = val;
break;
case TYPE_BIGINT:
*(reinterpret_cast<int64_t*>(slot_val_ptr)) = val;
break;
default:
DCHECK(false) << "Illegal translation from impala type "
<< slot_desc->DebugString() << " to orc INT";
}
break;
}
case orc::TypeKind::FLOAT:
case orc::TypeKind::DOUBLE: {
double val =
static_cast<const orc::DoubleVectorBatch*>(col_batch)->data.data()[row_idx];
if (slot_desc->type().type == TYPE_FLOAT) {
*(reinterpret_cast<float*>(slot_val_ptr)) = val;
} else {
DCHECK_EQ(slot_desc->type().type, TYPE_DOUBLE);
*(reinterpret_cast<double*>(slot_val_ptr)) = val;
}
break;
}
case orc::TypeKind::STRING:
case orc::TypeKind::VARCHAR:
case orc::TypeKind::CHAR: {
auto str_batch = static_cast<const orc::StringVectorBatch*>(col_batch);
const char* src_ptr = str_batch->data.data()[row_idx];
int64_t src_len = str_batch->length.data()[row_idx];
int dst_len = slot_desc->type().len;
if (slot_desc->type().type == TYPE_CHAR) {
int unpadded_len = min(dst_len, static_cast<int>(src_len));
char* dst_char = reinterpret_cast<char*>(slot_val_ptr);
memcpy(dst_char, src_ptr, unpadded_len);
StringValue::PadWithSpaces(dst_char, dst_len, unpadded_len);
break;
}
StringValue* dst = reinterpret_cast<StringValue*>(slot_val_ptr);
if (slot_desc->type().type == TYPE_VARCHAR && src_len > dst_len) {
dst->len = dst_len;
} else {
dst->len = src_len;
}
// Space in the StringVectorBatch is allocated by reader_mem_pool_. It will be
// reused at next batch, so we allocate a new space for this string.
uint8_t* buffer = dst_batch->tuple_data_pool()->TryAllocate(dst->len);
if (buffer == nullptr) {
string details = Substitute("Could not allocate string buffer of $0 bytes "
"for ORC file '$1'.", dst->len, filename());
return scan_node_->mem_tracker()->MemLimitExceeded(
state_, details, dst->len);
}
dst->ptr = reinterpret_cast<char*>(buffer);
memcpy(dst->ptr, src_ptr, dst->len);
break;
}
case orc::TypeKind::TIMESTAMP: {
const orc::TimestampVectorBatch* ts_batch =
static_cast<const orc::TimestampVectorBatch*>(col_batch);
int64_t secs = ts_batch->data.data()[row_idx];
int64_t nanos = ts_batch->nanoseconds.data()[row_idx];
*reinterpret_cast<TimestampValue*>(slot_val_ptr) =
TimestampValue::FromUnixTimeNanos(secs, nanos, state_->local_time_zone());
break;
}
case orc::TypeKind::DECIMAL: {
// For decimals whose precision is larger than 18, its value can't fit into
// an int64 (10^19 > 2^63). So we should use int128 for this case.
if (col_type->getPrecision() == 0 || col_type->getPrecision() > 18) {
auto int128_batch = static_cast<const orc::Decimal128VectorBatch*>(col_batch);
orc::Int128 orc_val = int128_batch->values.data()[row_idx];
DCHECK_EQ(slot_desc->type().GetByteSize(), 16);
int128_t val = orc_val.getHighBits();
val <<= 64;
val |= orc_val.getLowBits();
// Use memcpy to avoid gcc generating unaligned instructions like movaps
// for int128_t. They will raise SegmentFault when addresses are not
// aligned to 16 bytes.
memcpy(slot_val_ptr, &val, sizeof(int128_t));
} else {
// Reminder: even decimal(1,1) is stored in int64 batch
auto int64_batch = static_cast<const orc::Decimal64VectorBatch*>(col_batch);
int64_t val = int64_batch->values.data()[row_idx];
switch (slot_desc->type().GetByteSize()) {
case 4:
reinterpret_cast<Decimal4Value*>(slot_val_ptr)->value() = val;
break;
case 8:
reinterpret_cast<Decimal8Value*>(slot_val_ptr)->value() = val;
break;
case 16:
reinterpret_cast<Decimal16Value*>(slot_val_ptr)->value() = val;
break;
default: DCHECK(false) << "invalidate byte size";
}
}
break;
}
case orc::TypeKind::LIST:
case orc::TypeKind::MAP:
case orc::TypeKind::STRUCT:
case orc::TypeKind::UNION:
default:
DCHECK(false) << slot_desc->type().DebugString() << " map to ORC column "
<< col_type->toString();
Status HdfsOrcScanner::AssembleCollection(
const OrcComplexColumnReader& complex_col_reader, int row_idx,
CollectionValueBuilder* coll_value_builder) {
int total_tuples = complex_col_reader.GetNumTuples(row_idx);
if (!complex_col_reader.MaterializeTuple()) {
// 'complex_col_reader' maps to a STRUCT or collection column of STRUCTs/collections
// and there're no need to materialize current level tuples. Delegate the
// materialization to the unique child reader.
DCHECK_EQ(complex_col_reader.children().size(), 1);
DCHECK(complex_col_reader.children()[0]->IsComplexColumnReader());
auto child_reader = reinterpret_cast<OrcComplexColumnReader*>(
complex_col_reader.children()[0]);
// We should give the child reader the boundary (offset and total tuples) of current
// collection
int child_batch_offset = complex_col_reader.GetChildBatchOffset(row_idx);
for (int i = 0; i < total_tuples; ++i) {
RETURN_IF_ERROR(AssembleCollection(*child_reader, child_batch_offset + i,
coll_value_builder));
}
return Status::OK();
}
DCHECK(complex_col_reader.IsCollectionReader());
auto coll_reader = reinterpret_cast<const OrcCollectionReader*>(&complex_col_reader);
const TupleDescriptor* tuple_desc = &coll_value_builder->tuple_desc();
Tuple* template_tuple = template_tuple_map_[tuple_desc];
const vector<ScalarExprEvaluator*>& evals =
conjunct_evals_map_[tuple_desc->id()];
int tuple_idx = 0;
while (!scan_node_->ReachedLimit() && !context_->cancelled()
&& tuple_idx < total_tuples) {
MemPool* pool;
Tuple* tuple;
TupleRow* row = nullptr;
int64_t num_rows;
// We're assembling item tuples into an CollectionValue
parse_status_ =
GetCollectionMemory(coll_value_builder, &pool, &tuple, &row, &num_rows);
if (UNLIKELY(!parse_status_.ok())) break;
// 'num_rows' can be very high if we're writing to a large CollectionValue. Limit
// the number of rows we read at one time so we don't spend too long in the
// 'num_rows' loop below before checking for cancellation or limit reached.
num_rows = min(
num_rows, static_cast<int64_t>(scan_node_->runtime_state()->batch_size()));
int num_to_commit = 0;
while (num_to_commit < num_rows && tuple_idx < total_tuples) {
InitTuple(tuple_desc, template_tuple, tuple);
RETURN_IF_ERROR(coll_reader->ReadChildrenValue(row_idx, tuple_idx++, tuple, pool));
if (ExecNode::EvalConjuncts(evals.data(), evals.size(), row)) {
tuple = next_tuple(tuple_desc->byte_size(), tuple);
++num_to_commit;
}
}
coll_value_builder->CommitTuples(num_to_commit);
}
coll_items_read_counter_ += tuple_idx;
return Status::OK();
}
}

View File

@@ -26,16 +26,34 @@
#include "runtime/runtime-state.h"
#include "exec/hdfs-scanner.h"
#include "exec/hdfs-scan-node.h"
#include "exec/orc-metadata-utils.h"
#include "util/runtime-profile-counters.h"
namespace impala {
struct HdfsFileDesc;
class OrcStructReader;
class OrcComplexColumnReader;
/// This scanner leverage the ORC library to parse ORC files located in HDFS. Data is
/// transformed into Impala in-memory representation, i.e. Tuples, RowBatches.
/// transformed into Impala in-memory representation (i.e. Tuples, RowBatches) by
/// different kinds of OrcColumnReaders.
///
/// Steps of how we create orc::Reader, orc::RowReader and OrcColumnReaders:
/// * 'ProcessFileTail' to create orc::Reader with OrcMemPool and ScanRangeInputStream
/// * Resolve TupleDescriptors to get a list of mapped orc::Types (a.k.a column/node).
/// Init 'row_reader_options_' with these selected type ids.
/// * Build a map 'col_id_path_map_' from each orc::Type id to a SchemaPath. Will be
/// used in creating OrcColumnReaders.
/// * Create temporary orc::RowReader with 'row_reader_options_' to get the selected
/// subset of the schema (a tree of the selected orc::Types, i.e. the local variable
/// 'root_type' in 'HdfsOrcScanner::Open').
/// * Create OrcColumnReaders recursively with 'root_type', 'col_id_path_map_' and
/// TupleDescriptors (HdfsOrcScanner::Open)
/// * At the begining of processing a Stripe, we update 'row_reader_options_' to have
/// the range of the Stripe boundaries. Then create a orc::RowReader for this Stripe
/// (HdfsOrcScanner::NextStripe)
///
/// For the file format spec, see https://orc.apache.org/docs/spec-intro.html
class HdfsOrcScanner : public HdfsScanner {
public:
/// Exception throws from the orc scanner to stop the orc::RowReader. It's used in
@@ -52,6 +70,8 @@ class HdfsOrcScanner : public HdfsScanner {
Status status_;
};
/// A wrapper of std::malloc and std::free to track the memory usage of the ORC lib.
/// Without this the ORC lib will use std::malloc and std::free directly.
class OrcMemPool : public orc::MemoryPool {
public:
OrcMemPool(HdfsOrcScanner* scanner);
@@ -61,13 +81,14 @@ class HdfsOrcScanner : public HdfsScanner {
void free(char* p) override;
void FreeAll();
private:
private:
HdfsOrcScanner* scanner_;
MemTracker* mem_tracker_;
boost::unordered_map<char*, uint64_t> chunk_sizes_;
};
/// A wrapper of DiskIoMgr to be used by the ORC lib.
class ScanRangeInputStream : public orc::InputStream {
public:
ScanRangeInputStream(HdfsOrcScanner* scanner) {
@@ -77,21 +98,23 @@ class HdfsOrcScanner : public HdfsScanner {
scanner->context_->partition_descriptor()->id(), filename_);
}
uint64_t getLength() const {
uint64_t getLength() const override {
return file_desc_->file_length;
}
uint64_t getNaturalReadSize() const {
uint64_t getNaturalReadSize() const override {
return ExecEnv::GetInstance()->disk_io_mgr()->max_buffer_size();
}
void read(void* buf, uint64_t length, uint64_t offset);
/// Read 'length' bytes from the file starting at 'offset' into the buffer starting
/// at 'buf'.
void read(void* buf, uint64_t length, uint64_t offset) override;
const std::string& getName() const {
const std::string& getName() const override {
return filename_;
}
private:
private:
HdfsOrcScanner* scanner_;
HdfsFileDesc* file_desc_;
std::string filename_;
@@ -110,14 +133,22 @@ class HdfsOrcScanner : public HdfsScanner {
virtual void Close(RowBatch* row_batch) override;
private:
friend class OrcColumnReader;
friend class OrcStringColumnReader;
friend class OrcTimestampReader;
friend class OrcComplexColumnReader;
friend class OrcCollectionReader;
friend class OrcStructReader;
friend class OrcListReader;
friend class OrcMapReader;
friend class HdfsOrcScannerTest;
/// Memory guard of the tuple_mem_
uint8_t* tuple_mem_end_ = nullptr;
/// Index of the current stripe being processed. Initialized to -1 which indicates
/// that we have not started processing the first stripe yet (GetNext() has not yet
/// been called).
/// Index of the current stripe being processed. Stripe in ORC is equivalent to
/// RowGroup in Parquet. Initialized to -1 which indicates that we have not started
/// processing the first stripe yet (GetNext() has not yet been called).
int32_t stripe_idx_ = -1;
/// Counts the number of rows processed for the current stripe.
@@ -136,28 +167,43 @@ class HdfsOrcScanner : public HdfsScanner {
/// Mem pool used in orc readers.
boost::scoped_ptr<OrcMemPool> reader_mem_pool_;
std::unique_ptr<OrcSchemaResolver> schema_resolver_ = nullptr;
/// orc::Reader's responsibility is to read the footer and metadata from an ORC file.
/// It creates orc::RowReader for further materialization. orc::RowReader is used for
/// reading rows from the file.
std::unique_ptr<orc::Reader> reader_ = nullptr;
std::unique_ptr<orc::RowReader> row_reader_ = nullptr;
/// Orc reader will write slot values into this scratch batch for top-level tuples.
/// See AssembleRows().
std::unique_ptr<orc::ColumnVectorBatch> scratch_batch_;
int scratch_batch_tuple_idx_ = 0;
/// ReaderOptions used to create orc::Reader.
orc::ReaderOptions reader_options_;
/// RowReaderOptions used to create orc::RowReader.
orc::RowReaderOptions row_reader_options;
orc::RowReaderOptions row_reader_options_;
/// Column id is the pre order id in orc::Type tree.
/// Map from column id to slot descriptor.
boost::unordered_map<int, const SlotDescriptor*> col_id_slot_map_;
/// Scratch batch updated in place by 'row_reader_' (reader from the ORC lib). Will be
/// consumed by 'orc_root_reader_' (column reader implemented by ourselves). See more
/// in 'AssembleRows'
std::unique_ptr<orc::ColumnVectorBatch> orc_root_batch_;
/// Scan range for the metadata.
/// The root column reader to transfer orc values into impala RowBatch. The root of
/// the ORC file schema is always in STRUCT type so we use OrcStructReader here.
/// Instead of using std::unique_ptr, this object is tracked in 'obj_pool_' to be
/// together with children readers.
OrcStructReader* orc_root_reader_ = nullptr;
/// Slot descriptors that don't match any columns of the ORC file. We'll set NULL in
/// these slots.
std::unordered_set<const SlotDescriptor*> missing_field_slots_;
/// Selected column(type) ids of the ORC file. Use list instead of vector here since
/// orc::RowReaderOptions.includeTypes() expects a list
std::list<uint64_t> selected_type_ids_;
/// Map from orc::Type id to SchemaPath. See more in descriptors.h for the definition
/// of SchemaPath. The map is used in the constructors of OrcColumnReaders where we
/// resolve SchemaPaths of the descriptors into ORC columns. Built in 'Open'.
std::vector<SchemaPath> col_id_path_map_;
/// Scan range for the metadata (file tail).
const io::ScanRange* metadata_range_ = nullptr;
/// Timer for materializing rows. This ignores time getting the next buffer.
@@ -176,49 +222,63 @@ class HdfsOrcScanner : public HdfsScanner {
/// with the midpoint of any stripe in the file.
RuntimeProfile::Counter* num_scanners_with_no_reads_counter_ = nullptr;
/// Number of collection items read in current row batch. It is a scanner-local counter
/// used to reduce the frequency of updating HdfsScanNode counter. It is updated by the
/// callees of AssembleRows() and is merged into the HdfsScanNode counter at the end of
/// AssembleRows() and then is reset to 0.
int64_t coll_items_read_counter_;
const char *filename() const { return metadata_range_->file(); }
virtual Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
/// Advances 'stripe_idx_' to the next non-empty stripe and initializes
/// row_reader_ to scan it.
Status NextStripe() WARN_UNUSED_RESULT;
/// Reads data using orc-reader to materialize instances of 'tuple_desc'.
/// Reads data to materialize instances of 'tuple_desc'.
/// Returns a non-OK status if a non-recoverable error was encountered and execution
/// of this query should be terminated immediately.
Status AssembleRows(RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Function used by TransferScratchTuples() to read a single row from scratch_batch_
/// into 'tuple'.
Status ReadRow(const orc::StructVectorBatch& batch, int row_idx,
const orc::Type* orc_type, Tuple* tuple, RowBatch* dst_batch) WARN_UNUSED_RESULT;
/// Materialize collection(list/map) tuples belong to the 'row_idx'-th row of
/// coll_reader's ORC batch. Each column reader will hold an ORC batch until its values
/// are drained.
Status AssembleCollection(const OrcComplexColumnReader& coll_reader, int row_idx,
CollectionValueBuilder* coll_value_builder) WARN_UNUSED_RESULT;
/// Evaluates runtime filters and conjuncts (if any) against the tuples in
/// 'scratch_batch_', and adds the surviving tuples to the given batch.
/// Returns the number of rows that should be committed to the given batch.
Status TransferScratchTuples(RowBatch* dst_batch) WARN_UNUSED_RESULT;
/// Transfer rows in 'orc_root_batch_' into tuples in 'dst_batch'. Evaluates runtime
/// filters and conjuncts (if any) against the tuples. Only surviving tuples are added
/// to the given batch. Returns if either 'orc_root_batch_' is drained or 'dst_batch'
/// is full.
Status TransferTuples(OrcComplexColumnReader* column_reader,
RowBatch* dst_batch) WARN_UNUSED_RESULT;
/// Process the file footer and parse file_metadata_. This should be called with the
/// last FOOTER_SIZE bytes in context_.
Status ProcessFileTail() WARN_UNUSED_RESULT;
/// Update reader options used in orc reader by the given tuple descriptor.
Status SelectColumns(const TupleDescriptor* tuple_desc) WARN_UNUSED_RESULT;
/// Resolve SchemaPath in TupleDescriptors and translate them to ORC type ids into
/// 'selected_nodes'. Track the position slots by pre-order traversal in the
/// descriptors and push them to a stack as 'pos_slots'.
Status ResolveColumns(const TupleDescriptor& tuple_desc,
std::list<const orc::Type*>* selected_nodes,
std::stack<const SlotDescriptor*>* pos_slots);
/// Validate whether the ColumnType is compatible with the orc type
Status ValidateType(const ColumnType& type, const orc::Type& orc_type)
WARN_UNUSED_RESULT;
/// Resolve 'tuple_desc' to get selected columns. Update 'row_reader_options' with the
/// selected type ids.
Status SelectColumns(const TupleDescriptor& tuple_desc) WARN_UNUSED_RESULT;
/// Part of the HdfsScanner interface, not used in Orc.
Status InitNewRange() override WARN_UNUSED_RESULT { return Status::OK(); }
THdfsCompression::type TranslateCompressionKind(orc::CompressionKind kind);
inline bool ScratchBatchNotEmpty();
inline Status AllocateTupleMem(RowBatch* row_batch) WARN_UNUSED_RESULT;
bool IsPartitionKeySlot(const SlotDescriptor* slot);
bool IsMissingField(const SlotDescriptor* slot);
};
} // namespace impala

View File

@@ -0,0 +1,612 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/orc-column-readers.h"
#include <queue>
#include "runtime/collection-value-builder.h"
#include "runtime/timestamp-value.inline.h"
#include "common/names.h"
using namespace impala;
namespace impala {
string PrintNode(const orc::Type* node) {
return Substitute("$0 column (ORC id=$1)", node->toString(), node->getColumnId());
}
OrcColumnReader* OrcColumnReader::Create(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner) {
DCHECK(node != nullptr);
DCHECK(slot_desc != nullptr);
OrcColumnReader* reader = nullptr;
if (node->getKind() == orc::TypeKind::STRUCT) {
reader = new OrcStructReader(node, slot_desc, scanner);
} else if (node->getKind() == orc::TypeKind::LIST) {
reader = new OrcListReader(node, slot_desc, scanner);
} else if (node->getKind() == orc::TypeKind::MAP) {
reader = new OrcMapReader(node, slot_desc, scanner);
} else {
switch (slot_desc->type().type) {
case TYPE_BOOLEAN:
reader = new OrcBoolColumnReader(node, slot_desc, scanner);
break;
case TYPE_TINYINT:
reader = new OrcIntColumnReader<int8_t>(node, slot_desc, scanner);
break;
case TYPE_SMALLINT:
reader = new OrcIntColumnReader<int16_t>(node, slot_desc, scanner);
break;
case TYPE_INT:
reader = new OrcIntColumnReader<int32_t>(node, slot_desc, scanner);
break;
case TYPE_BIGINT:
reader = new OrcIntColumnReader<int64_t>(node, slot_desc, scanner);
break;
case TYPE_FLOAT:
reader = new OrcDoubleColumnReader<float>(node, slot_desc, scanner);
break;
case TYPE_DOUBLE:
reader = new OrcDoubleColumnReader<double>(node, slot_desc, scanner);
break;
case TYPE_TIMESTAMP:
reader = new OrcTimestampReader(node, slot_desc, scanner);
break;
case TYPE_STRING:
case TYPE_VARCHAR:
case TYPE_CHAR:
reader = new OrcStringColumnReader(node, slot_desc, scanner);
break;
case TYPE_DECIMAL:
if (node->getPrecision() == 0 || node->getPrecision() > 18) {
// For decimals whose precision is larger than 18, its value can't fit into
// an int64 (10^19 > 2^63). So we should use int128 for this case.
reader = new OrcDecimal16ColumnReader(node, slot_desc, scanner);
} else {
switch (slot_desc->type().GetByteSize()) {
case 4:
reader = new OrcDecimalColumnReader<Decimal4Value>(
node, slot_desc, scanner);
break;
case 8:
reader = new OrcDecimalColumnReader<Decimal8Value>(
node, slot_desc, scanner);
break;
case 16:
reader = new OrcDecimalColumnReader<Decimal16Value>(
node, slot_desc, scanner);
break;
default:
DCHECK(false) << "invalidate byte size for decimal type: "
<< slot_desc->type().GetByteSize();
}
}
break;
default:
DCHECK(false) << slot_desc->type().DebugString();
} // end of switch
}
return scanner->obj_pool_.Add(reader);
}
OrcComplexColumnReader* OrcComplexColumnReader::CreateTopLevelReader(
const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner) {
OrcComplexColumnReader* reader = nullptr;
if (node->getKind() == orc::TypeKind::STRUCT) {
reader = new OrcStructReader(node, table_tuple_desc, scanner);
} else if (node->getKind() == orc::TypeKind::LIST) {
reader = new OrcListReader(node, table_tuple_desc, scanner);
} else if (node->getKind() == orc::TypeKind::MAP) {
reader = new OrcMapReader(node, table_tuple_desc, scanner);
} else {
DCHECK(false) << "Can't create top level reader for " << PrintNode(node);
}
return scanner->obj_pool_.Add(reader);
}
OrcColumnReader::OrcColumnReader(const orc::Type* orc_type,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
: slot_desc_(slot_desc), scanner_(scanner) {
orc_column_id_ = DCHECK_NOTNULL(orc_type)->getColumnId();
if (slot_desc_ == nullptr) {
orc::TypeKind type_kind = orc_type->getKind();
DCHECK(type_kind == orc::TypeKind::LIST
|| type_kind == orc::TypeKind::MAP
|| type_kind == orc::TypeKind::STRUCT)
<< "Selected primitive types should have SlotDescriptors";
}
VLOG(3) << "Created reader for " << PrintNode(orc_type) << ": slot_desc_="
<< (slot_desc_? slot_desc_->DebugString() : "null");
}
Status OrcBoolColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
int64_t val = batch_->data.data()[row_idx];
*(reinterpret_cast<bool*>(GetSlot(tuple))) = (val != 0);
return Status::OK();
}
Status OrcStringColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
const char* src_ptr = batch_->data.data()[row_idx];
int64_t src_len = batch_->length.data()[row_idx];
int dst_len = slot_desc_->type().len;
if (slot_desc_->type().type == TYPE_CHAR) {
int unpadded_len = min(dst_len, static_cast<int>(src_len));
char* dst_char = reinterpret_cast<char*>(GetSlot(tuple));
memcpy(dst_char, src_ptr, unpadded_len);
StringValue::PadWithSpaces(dst_char, dst_len, unpadded_len);
return Status::OK();
}
StringValue* dst = reinterpret_cast<StringValue*>(GetSlot(tuple));
if (slot_desc_->type().type == TYPE_VARCHAR && src_len > dst_len) {
dst->len = dst_len;
} else {
dst->len = src_len;
}
// Space in the StringVectorBatch is allocated by scanner_->reader_mem_pool_. It will
// be reused at next batch, so we allocate a new space for this string.
uint8_t* buffer = pool->TryAllocateUnaligned(dst->len);
if (buffer == nullptr) {
string details = Substitute("Could not allocate string buffer of $0 bytes "
"for ORC file '$1'.", dst->len, scanner_->filename());
return scanner_->scan_node_->mem_tracker()->MemLimitExceeded(
scanner_->state_, details, dst->len);
}
dst->ptr = reinterpret_cast<char*>(buffer);
memcpy(dst->ptr, src_ptr, dst->len);
return Status::OK();
}
Status OrcTimestampReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
int64_t secs = batch_->data.data()[row_idx];
int64_t nanos = batch_->nanoseconds.data()[row_idx];
auto slot = reinterpret_cast<TimestampValue*>(GetSlot(tuple));
*slot = TimestampValue::FromUnixTimeNanos(secs, nanos,
scanner_->state_->local_time_zone());
return Status::OK();
}
Status OrcDecimal16ColumnReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
orc::Int128 orc_val = batch_->values.data()[row_idx];
DCHECK_EQ(slot_desc_->type().GetByteSize(), 16);
int128_t val = orc_val.getHighBits();
val <<= 64;
val |= orc_val.getLowBits();
// Use memcpy to avoid gcc generating unaligned instructions like movaps
// for int128_t. They will raise SegmentFault when addresses are not
// aligned to 16 bytes.
memcpy(GetSlot(tuple), &val, sizeof(int128_t));
return Status::OK();
}
OrcComplexColumnReader::OrcComplexColumnReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcColumnReader(node, nullptr, scanner) {
SchemaPath& path = scanner->col_id_path_map_[node->getColumnId()];
if (path == table_tuple_desc->tuple_path()) tuple_desc_ = table_tuple_desc;
materialize_tuple_ = (tuple_desc_ != nullptr);
VLOG(3) << "Created top level ComplexColumnReader for " << PrintNode(node)
<< ": tuple_desc_=" << (tuple_desc_ ? tuple_desc_->DebugString() : "null");
}
bool OrcComplexColumnReader::EndOfBatch() {
DCHECK(slot_desc_ == nullptr
&& (tuple_desc_ == nullptr || tuple_desc_ == scanner_->scan_node_->tuple_desc()))
<< "Should be top level reader when calling EndOfBatch()";
if (!materialize_tuple_) {
// If this reader is not materializing tuples, its 'row_idx_' is invalid and the
// progress is tracked in the child. Delegate the judgement to the child recursively.
DCHECK_EQ(children_.size(), 1);
return static_cast<OrcComplexColumnReader*>(children_[0])->EndOfBatch();
}
if (vbatch_) DCHECK_LE(row_idx_, vbatch_->numElements);
return vbatch_ == nullptr || row_idx_ == vbatch_->numElements;
}
inline bool PathContains(const SchemaPath& path, const SchemaPath& sub_path) {
return path.size() >= sub_path.size() &&
std::equal(sub_path.begin(), sub_path.end(), path.begin());
}
inline const SchemaPath& GetTargetColPath(const SlotDescriptor* slot_desc) {
return slot_desc->type().IsCollectionType() ?
slot_desc->collection_item_descriptor()->tuple_path(): slot_desc->col_path();
}
bool OrcStructReader::FindChild(const orc::Type& parent, const SchemaPath& child_path,
const orc::Type** child, int* field) {
int size = parent.getSubtypeCount();
for (int c = 0; c < size; ++c) {
const orc::Type* node = parent.getSubtype(c);
const SchemaPath& node_path = scanner_->col_id_path_map_[node->getColumnId()];
if (PathContains(child_path, node_path)) {
*child = node;
*field = c;
return true;
}
}
return false;
}
void OrcStructReader::CreateChildForSlot(const orc::Type* curr_node,
const SlotDescriptor* slot_desc) {
// 'slot_desc' matches a descendant of 'curr_node' which may not be a direct child.
// Find a child node that lays in the path from 'curr_node' to the descendant.
// Create a child reader and pass down 'slot_desc'.
const orc::Type* child_node;
int field;
if (!FindChild(*curr_node, GetTargetColPath(slot_desc), &child_node, &field)) {
DCHECK(false) << PrintNode(curr_node) << " has no children selected for "
<< slot_desc->DebugString();
}
OrcColumnReader* child = OrcColumnReader::Create(child_node, slot_desc, scanner_);
children_.push_back(child);
children_fields_.push_back(field);
}
OrcStructReader::OrcStructReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, table_tuple_desc, scanner) {
if (materialize_tuple_) {
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
// Skip partition columns and missed columns
if (scanner->IsPartitionKeySlot(child_slot)
|| scanner->IsMissingField(child_slot)) {
continue;
}
CreateChildForSlot(node, child_slot);
}
} else {
// No tuples should be materialized by this reader, because 'table_tuple_desc'
// matches to a descendant of 'node'. Those tuples should be materialized by the
// corresponding descendant reader. So 'node' should have exactly one selected
// subtype: the child in the path to the target descendant.
DCHECK_EQ(node->getSubtypeCount(), 1);
OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader(
node->getSubtype(0), table_tuple_desc, scanner);
children_.push_back(child);
children_fields_.push_back(0);
}
}
OrcStructReader::OrcStructReader(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, slot_desc, scanner) {
// 'slot_desc' won't map to a STRUCT column. It only matches a descendant column.
// If the descendant column is missing in the file, skip creating the child reader.
if (scanner->IsMissingField(slot_desc)) return;
CreateChildForSlot(node, slot_desc);
VLOG(3) << "Created StructReader for " << PrintNode(node) << ": slot_desc_="
<< slot_desc->DebugString();
}
Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
return Status::OK();
}
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(row_idx, tuple, pool));
}
return Status::OK();
}
void OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
OrcComplexColumnReader::UpdateInputBatch(orc_batch);
batch_ = static_cast<orc::StructVectorBatch*>(orc_batch);
// In debug mode, we use dynamic_cast<> to double-check the downcast is legal
DCHECK(batch_ == dynamic_cast<orc::StructVectorBatch*>(orc_batch));
if (batch_ == nullptr || batch_->numElements == 0) {
row_idx_ = 0;
for (OrcColumnReader* child : children_) child->UpdateInputBatch(nullptr);
return;
}
row_idx_ = 0;
int size = children_.size();
for (int c = 0; c < size; ++c) {
children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]]);
}
}
Status OrcStructReader::TransferTuple(Tuple* tuple, MemPool* pool) {
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(row_idx_, tuple, pool));
}
++row_idx_;
return Status::OK();
}
OrcCollectionReader::OrcCollectionReader(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, slot_desc, scanner) {
const SchemaPath& path = scanner->col_id_path_map_[node->getColumnId()];
if (slot_desc->type().IsCollectionType() &&
slot_desc->collection_item_descriptor()->tuple_path() == path) {
// This is a collection SlotDescriptor whose item TupleDescriptor matches our
// SchemaPath. We should materialize the slot (creating a CollectionValue) and its
// collection tuples (see more in HdfsOrcScanner::AssembleCollection).
tuple_desc_ = slot_desc->collection_item_descriptor();
materialize_tuple_ = true;
}
}
Status OrcCollectionReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
if (IsNull(DCHECK_NOTNULL(vbatch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
auto coll_slot = reinterpret_cast<CollectionValue*>(GetSlot(tuple));
*coll_slot = CollectionValue();
const TupleDescriptor* tuple_desc = slot_desc_->collection_item_descriptor();
CollectionValueBuilder builder(coll_slot, *tuple_desc, pool, scanner_->state_);
return scanner_->AssembleCollection(*this, row_idx, &builder);
}
void OrcListReader::CreateChildForSlot(const orc::Type* node,
const SlotDescriptor* slot_desc) {
int depth = scanner_->col_id_path_map_[node->getColumnId()].size();
const SchemaPath& target_path = GetTargetColPath(slot_desc);
DCHECK_GT(target_path.size(), depth);
int field = target_path[depth];
if (field == SchemaPathConstants::ARRAY_POS) {
DCHECK(pos_slot_desc_ == nullptr) << "Should have unique pos slot";
pos_slot_desc_ = slot_desc;
} else {
DCHECK_EQ(field, SchemaPathConstants::ARRAY_ITEM);
OrcColumnReader* child = OrcColumnReader::Create(node->getSubtype(0), slot_desc,
scanner_);
children_.push_back(child);
}
}
OrcListReader::OrcListReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcCollectionReader(node, table_tuple_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader(
node->getSubtype(0), table_tuple_desc, scanner);
children_.push_back(child);
}
}
OrcListReader::OrcListReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner) : OrcCollectionReader(node, slot_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
// 'slot_desc' matches a descendant instead. Create a child reader for the child node
// laying in the path to the descendant.
CreateChildForSlot(node, slot_desc);
}
VLOG(3) << "Created ListReader for " << PrintNode(node) << ": tuple_desc_="
<< (tuple_desc_ != nullptr ? tuple_desc_->DebugString() : "null");
}
void OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
OrcComplexColumnReader::UpdateInputBatch(orc_batch);
batch_ = static_cast<orc::ListVectorBatch*>(orc_batch);
// In debug mode, we use dynamic_cast<> to double-check the downcast is legal
DCHECK(batch_ == dynamic_cast<orc::ListVectorBatch*>(orc_batch));
orc::ColumnVectorBatch* item_batch = batch_ ? batch_->elements.get() : nullptr;
for (OrcColumnReader* child : children_) child->UpdateInputBatch(item_batch);
if (batch_) {
row_idx_ = -1;
NextRow();
}
}
int OrcListReader::GetNumTuples(int row_idx) const {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) return 0;
DCHECK_GT(batch_->offsets.size(), row_idx + 1);
return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx];
}
int OrcListReader::GetChildBatchOffset(int row_idx) const {
return batch_->offsets[row_idx];
}
Status OrcListReader::TransferTuple(Tuple* tuple, MemPool* pool) {
if (pos_slot_desc_) {
int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(
tuple->GetSlot(pos_slot_desc_->tuple_offset()));
*slot_val_ptr = array_idx_;
}
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(array_start_ + array_idx_, tuple, pool));
}
array_idx_++;
if (array_start_ + array_idx_ >= array_end_) NextRow();
return Status::OK();
}
Status OrcListReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const {
DCHECK_LT(row_idx, batch_->numElements);
int offset = batch_->offsets[row_idx];
if (pos_slot_desc_) {
int64_t* slot_val_ptr = reinterpret_cast<int64_t*>(
tuple->GetSlot(pos_slot_desc_->tuple_offset()));
*slot_val_ptr = tuple_idx;
}
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(offset + tuple_idx, tuple, pool));
}
return Status::OK();
}
void OrcListReader::NextRow() {
do {
++row_idx_;
if (row_idx_ >= batch_->numElements) break;
array_start_ = batch_->offsets[row_idx_];
array_end_ = batch_->offsets[row_idx_ + 1];
} while (IsNull(batch_, row_idx_) || array_start_ == array_end_);
array_idx_ = 0;
}
void OrcMapReader::CreateChildForSlot(const orc::Type* node,
const SlotDescriptor* slot_desc) {
const SchemaPath& path = scanner_->col_id_path_map_[node->getColumnId()];
const SchemaPath& target_path = GetTargetColPath(slot_desc);
int depth = path.size();
// The target of 'slot_desc' matches a descendant so its SchemaPath should be deeper
DCHECK_GT(target_path.size(), depth);
int field = target_path[depth];
const orc::Type* child_type;
if (field == SchemaPathConstants::MAP_KEY) {
child_type = node->getSubtype(0);
} else {
DCHECK_EQ(field, SchemaPathConstants::MAP_VALUE);
child_type = node->getSubtype(1);
}
DCHECK(child_type != nullptr) << Substitute(
"$0 matches an empty child of $1: path=$2, target_path=$3",
slot_desc->DebugString(), PrintNode(node), PrintNumericPath(path),
PrintNumericPath(target_path));
OrcColumnReader* child = OrcColumnReader::Create(child_type, slot_desc, scanner_);
children_.push_back(child);
if (field == SchemaPathConstants::MAP_KEY) {
key_readers_.push_back(child);
} else {
value_readers_.push_back(child);
}
}
OrcMapReader::OrcMapReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcCollectionReader(node, table_tuple_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
// 'table_tuple_desc' should match to a descendant of 'node'
int depth = scanner->col_id_path_map_[node->getColumnId()].size();
DCHECK_GT(table_tuple_desc->tuple_path().size(), depth);
// Create a child corresponding to the subtype in the path to the descendant.
int field = table_tuple_desc->tuple_path()[depth];
DCHECK(field == SchemaPathConstants::MAP_KEY ||
field == SchemaPathConstants::MAP_VALUE);
bool key_selected = (field == SchemaPathConstants::MAP_KEY);
const orc::Type* child_type =
key_selected ? node->getSubtype(0) : node->getSubtype(1);
OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader(
child_type, table_tuple_desc, scanner);
children_.push_back(child);
if (key_selected) {
key_readers_.push_back(child);
} else {
value_readers_.push_back(child);
}
}
VLOG(3) << "Created MapReader for " << PrintNode(node) << ": tuple_desc_="
<< (tuple_desc_ != nullptr ? tuple_desc_->DebugString() : "null");
}
OrcMapReader::OrcMapReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner) : OrcCollectionReader(node, slot_desc, scanner) {
if (materialize_tuple_) {
DCHECK(tuple_desc_ != nullptr);
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
CreateChildForSlot(node, child_slot);
}
} else {
CreateChildForSlot(node, slot_desc);
}
}
void OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
OrcComplexColumnReader::UpdateInputBatch(orc_batch);
batch_ = static_cast<orc::MapVectorBatch*>(orc_batch);
// In debug mode, we use dynamic_cast<> to double-check the downcast is legal
DCHECK(batch_ == dynamic_cast<orc::MapVectorBatch*>(orc_batch));
orc::ColumnVectorBatch* key_batch = batch_ ? batch_->keys.get() : nullptr;
orc::ColumnVectorBatch* value_batch = batch_ ? batch_->elements.get() : nullptr;
for (OrcColumnReader* child : key_readers_) child->UpdateInputBatch(key_batch);
for (OrcColumnReader* child : value_readers_) child->UpdateInputBatch(value_batch);
if (batch_) {
row_idx_ = -1;
NextRow();
}
}
void OrcMapReader::NextRow() {
do {
++row_idx_;
if (row_idx_ >= batch_->numElements) break;
array_offset_ = batch_->offsets[row_idx_];
array_end_ = batch_->offsets[row_idx_ + 1];
} while (IsNull(batch_, row_idx_) || array_offset_ == array_end_);
}
int OrcMapReader::GetNumTuples(int row_idx) const {
if (IsNull(batch_, row_idx)) return 0;
DCHECK_GT(batch_->offsets.size(), row_idx + 1);
return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx];
}
int OrcMapReader::GetChildBatchOffset(int row_idx) const {
return batch_->offsets[row_idx];
}
Status OrcMapReader::TransferTuple(Tuple* tuple, MemPool* pool) {
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(array_offset_, tuple, pool));
}
array_offset_++;
if (array_offset_ >= array_end_) NextRow();
return Status::OK();
}
Status OrcMapReader::ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const {
DCHECK_LT(row_idx, batch_->numElements);
int offset = batch_->offsets[row_idx];
for (OrcColumnReader* child : children_) {
RETURN_IF_ERROR(child->ReadValue(offset + tuple_idx, tuple, pool));
}
return Status::OK();
}
}

View File

@@ -0,0 +1,485 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <orc/OrcFile.hh>
#include <queue>
#include "exec/hdfs-orc-scanner.h"
namespace impala {
class HdfsOrcScanner;
/// Base class for reading an ORC column. Each column reader will keep track of an
/// orc::ColumnVectorBatch and transfer its values into Impala internals(tuples/slots).
///
/// We implement subclasses for each primitive types. They'll keep the SlotDescriptor
/// to locate the slot to materialize. Basically, the usage of the interfaces follows the
/// pattern:
/// reader1 = Create(orc_node1, slot_desc1, orc_scanner);
/// reader2 = Create(orc_node2, slot_desc2, orc_scanner);
/// while ( /* has new batch in the stripe */ ) {
/// reader1->UpdateInputBatch(orc_batch_of_column1)
/// reader2->UpdateInputBatch(orc_batch_of_column2)
/// while ( /* has more rows to read */ ) {
/// tuple = ... // Init tuple
/// reader1->ReadValue(row_idx, tuple, mem_pool);
/// reader2->ReadValue(row_idx, tuple, mem_pool);
/// row_idx++;
/// }
/// }
///
/// For complex types readers, they can be top-level readers (readers materializing
/// table level tuples), so we need more interface to deal with table/collection level
/// tuple materialization. See more in the class comments of OrcComplexColumnReader.
class OrcColumnReader {
public:
/// Create a column reader for the given 'slot_desc' based on the ORC 'node'. We say
/// the 'slot_desc' and ORC 'node' match iff
/// scanner->col_id_path_map_[node->getColumnId()] == slot_desc->col_path
/// Caller should guaranteed that 'slot_desc' matches to ORC 'node' or one of its
/// descendants. If 'node' is a primitive type, 'slot_desc' should match it since
/// primitive types don't have descendants.
/// If 'node' is in complex types (struct/array/map) and does not match 'slot_desc',
/// the created reader will use the 'slot_desc' to create its children. See more in
/// constructors of complex column readers.
/// The Create function adds the object to the obj_pool of the parent HdfsOrcScanner.
static OrcColumnReader* Create(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner);
/// Base constructor for all types of readers that hold a SlotDescriptor (non top-level
/// readers). Primitive column readers will materialize values into the slot. STRUCT
/// column readers will delegate the slot materialization to its children. Collection
/// column (ARRAY/MAP) readers will create CollectionValue in the slot and assemble
/// collection tuples referenced by the CollectionValue. (See more in 'ReadValue')
OrcColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner);
virtual ~OrcColumnReader() { }
/// Default to true for primitive column readers. Only complex column readers can be
/// not materializing tuples.
virtual bool MaterializeTuple() const { return true; }
/// Whether it's a reader for a STRUCT/ARRAY/MAP column.
virtual bool IsComplexColumnReader() const { return false; }
/// Whether it's a reader for a ARRAY/MAP column.
virtual bool IsCollectionReader() const { return false; }
/// Update the orc batch we tracked. We'll read values from it.
virtual void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) = 0;
/// Read value at 'row_idx' of the ColumnVectorBatch into a slot of the given 'tuple'.
/// Use 'pool' to allocate memory in need. Depends on the UpdateInputBatch being called
/// before (thus batch_ is updated)
virtual Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
WARN_UNUSED_RESULT = 0;
protected:
friend class OrcStructReader;
/// Convenient field for debug. We can't keep the pointer of orc::Type since they'll be
/// destroyed after orc::RowReader was released. Only keep the id orc::Type here.
uint64_t orc_column_id_;
/// If the reader is materializing a slot inside a tuple, the SlotDescriptor is kept.
/// Otherwise (top level readers), 'slot_desc_' will be nullptr.
const SlotDescriptor* slot_desc_;
HdfsOrcScanner* scanner_;
inline static bool IsNull(orc::ColumnVectorBatch* orc_batch, int row_idx) {
return orc_batch->hasNulls && !orc_batch->notNull[row_idx];
}
/// Set the reader's slot in the given 'tuple' to NULL
virtual void SetNullSlot(Tuple* tuple) {
tuple->SetNull(DCHECK_NOTNULL(slot_desc_)->null_indicator_offset());
}
inline void* GetSlot(Tuple* tuple) const {
return tuple->GetSlot(DCHECK_NOTNULL(slot_desc_)->tuple_offset());
}
};
class OrcBoolColumnReader : public OrcColumnReader {
public:
OrcBoolColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) { }
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
// In debug mode, we use dynamic_cast<> to double-check the downcast is legal
DCHECK(batch_ == dynamic_cast<orc::LongVectorBatch*>(orc_batch));
}
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
private:
orc::LongVectorBatch* batch_ = nullptr;
};
template<typename T>
class OrcIntColumnReader : public OrcColumnReader {
public:
OrcIntColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) { }
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
batch_ = static_cast<orc::LongVectorBatch*>(orc_batch);
DCHECK(batch_ == static_cast<orc::LongVectorBatch*>(orc_batch));
}
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
override WARN_UNUSED_RESULT {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
int64_t val = batch_->data.data()[row_idx];
*(reinterpret_cast<T*>(GetSlot(tuple))) = val;
return Status::OK();
}
private:
orc::LongVectorBatch* batch_ = nullptr;
};
template<typename T>
class OrcDoubleColumnReader : public OrcColumnReader {
public:
OrcDoubleColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) { }
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
batch_ = static_cast<orc::DoubleVectorBatch*>(orc_batch);
DCHECK(batch_ == dynamic_cast<orc::DoubleVectorBatch*>(orc_batch));
}
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
override WARN_UNUSED_RESULT {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
double val = batch_->data.data()[row_idx];
*(reinterpret_cast<T*>(GetSlot(tuple))) = val;
return Status::OK();
}
private:
orc::DoubleVectorBatch* batch_;
};
class OrcStringColumnReader : public OrcColumnReader {
public:
OrcStringColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) { }
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
batch_ = static_cast<orc::StringVectorBatch*>(orc_batch);
DCHECK(batch_ == dynamic_cast<orc::StringVectorBatch*>(orc_batch));
}
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
private:
orc::StringVectorBatch* batch_ = nullptr;
};
class OrcTimestampReader : public OrcColumnReader {
public:
OrcTimestampReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) { }
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
batch_ = static_cast<orc::TimestampVectorBatch*>(orc_batch);
DCHECK(batch_ == dynamic_cast<orc::TimestampVectorBatch*>(orc_batch));
}
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
private:
orc::TimestampVectorBatch* batch_ = nullptr;
};
template<typename DECIMAL_TYPE>
class OrcDecimalColumnReader : public OrcColumnReader {
public:
OrcDecimalColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) { }
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
// Reminder: even decimal(1,1) is stored in int64 batch
batch_ = static_cast<orc::Decimal64VectorBatch*>(orc_batch);
DCHECK(batch_ == dynamic_cast<orc::Decimal64VectorBatch*>(orc_batch));
}
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool)
override WARN_UNUSED_RESULT {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) {
SetNullSlot(tuple);
return Status::OK();
}
int64_t val = batch_->values.data()[row_idx];
reinterpret_cast<DECIMAL_TYPE*>(GetSlot(tuple))->value() = val;
return Status::OK();
}
private:
orc::Decimal64VectorBatch* batch_ = nullptr;
};
class OrcDecimal16ColumnReader : public OrcColumnReader {
public:
OrcDecimal16ColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner)
: OrcColumnReader(node, slot_desc, scanner) { }
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
batch_ = static_cast<orc::Decimal128VectorBatch*>(orc_batch);
DCHECK(batch_ == dynamic_cast<orc::Decimal128VectorBatch*>(orc_batch));
}
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
private:
orc::Decimal128VectorBatch* batch_ = nullptr;
};
/// Base class for reading a complex column. The two subclasses are OrcStructReader and
/// OrcCollectionReader. Each OrcComplexColumnReader has children readers for sub types.
/// Each slot maps to a child. Different children may maps to the same sub type, because
/// SlotDescriptors of a TupleDescriptor may have the same col_path (e.g. when there're
/// sub queries). The root reader is always an OrcStructReader since the root of the ORC
/// schema is represented as a STRUCT type.
///
/// Only OrcComplexColumnReaders can be top-level readers: readers that control the
/// materialization of the top-level tuples, whether directly or indirectly (by its
/// unique child).
///
/// There're only one top-level reader that directly materializes top-level(table-level)
/// tuples: the reader whose orc_node matches the tuple_path of the top-level
/// TupleDescriptor. For the only top-level reader that directly materializes top-level
/// tuples, the usage of the interfaces follows the pattern:
/// while ( /* has new batch in the stripe */ ) {
/// reader->UpdateInputBatch(orc_batch);
/// while (!reader->EndOfBatch()) {
/// tuple = ... // Init tuple
/// reader->TransferTuple(tuple, mem_pool);
/// }
/// }
/// 'TransferTuple' don't require a row index since the top-level reader will keep
/// track of the progress by internal fields:
/// * STRUCT reader: row_idx_
/// * LIST reader: row_idx_, array_start_, array_idx_, array_end_
/// * MAP reader: row_idx_, array_offset_, array_end_
///
/// For top-level readers that indirectly materializes tuples, they are ancestors of the
/// above reader. Such kind of readers just UpdateInputBatch (so update children's
/// recursively) and then delegate the materialization to their children. (See more in
/// HdfsOrcScanner::TransferTuples)
///
/// For non top-level readers, they can be divided into two kinds by whether they should
/// materialize collection tuples (reflected by materialize_tuple_). STRUCT is not a
/// collection type so non top-level STRUCT readers always have materialize_tuple_ being
/// false as default.
///
/// For non top-level collection type readers, they create a CollectionValue and a
/// CollectionValueBuilder when 'ReadValue' is called. Then recursively delegate the
/// materialization of collection tuples to the child that matches the TupleDescriptor.
/// This child tracks the boundary of current collection and call 'ReadChildrenValue' to
/// assemble collection tuples. (See more in HdfsOrcScanner::AssembleCollection)
///
/// Children readers are created in the constructor recursively.
class OrcComplexColumnReader : public OrcColumnReader {
public:
static OrcComplexColumnReader* CreateTopLevelReader(const orc::Type* node,
const TupleDescriptor* tuple_desc, HdfsOrcScanner* scanner);
/// Constructor for top-level readers
OrcComplexColumnReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner);
/// Constructor for non top-level readers
OrcComplexColumnReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner) : OrcColumnReader(node, slot_desc, scanner) { }
bool IsComplexColumnReader() const override { return true; }
bool MaterializeTuple() const override { return materialize_tuple_; }
/// Whether we've finished reading the current orc batch.
bool EndOfBatch();
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override {
vbatch_ = orc_batch;
}
/// Assemble current collection value (tracked by 'row_idx_') into a top level 'tuple'.
/// Depends on the UpdateInputBatch being called before (thus batch_ is updated)
virtual Status TransferTuple(Tuple* tuple, MemPool* pool) WARN_UNUSED_RESULT = 0;
/// Num of tuples inside the 'row_idx'-th row. LIST/MAP types will have 0 to N tuples.
/// STRUCT type will always have one tuple.
virtual int GetNumTuples(int row_idx) const = 0;
/// Collection values (array items, map keys/values) are concatenated in the child's
/// batch. Get the start offset of values inside the 'row_idx'-th collection.
virtual int GetChildBatchOffset(int row_idx) const = 0;
const vector<OrcColumnReader*>& children() const { return children_; }
protected:
vector<OrcColumnReader*> children_;
/// Holds the TupleDescriptor if we should materialize its tuples
const TupleDescriptor* tuple_desc_ = nullptr;
bool materialize_tuple_ = false;
/// Keep row index if we're top level readers
int row_idx_;
/// Convenient reference to 'batch_' of subclass.
orc::ColumnVectorBatch* vbatch_ = nullptr;
};
class OrcStructReader : public OrcComplexColumnReader {
public:
/// Constructor for top level reader
OrcStructReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner);
OrcStructReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner);
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
int GetNumTuples(int row_idx) const override { return 1; }
int GetChildBatchOffset(int row_idx) const override { return row_idx; }
private:
orc::StructVectorBatch* batch_ = nullptr;
/// Field ids of the children reader
std::vector<int> children_fields_;
void SetNullSlot(Tuple* tuple) override {
for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
}
void CreateChildForSlot(const orc::Type* curr_node, const SlotDescriptor* slot_desc);
/// Find which children of 'curr_node' matches the 'child_path'. Return the result in
/// '*child' and its index inside the children. Returns false for not found.
inline bool FindChild(const orc::Type& curr_node, const SchemaPath& child_path,
const orc::Type** child, int* field);
};
class OrcCollectionReader : public OrcComplexColumnReader {
public:
/// Constructor for top level reader
OrcCollectionReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner) : OrcComplexColumnReader(node, table_tuple_desc, scanner)
{ }
OrcCollectionReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner);
bool IsCollectionReader() const override { return true; }
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
/// Assemble the given 'tuple' by reading children values into it. The corresponding
/// children values are in the 'row_idx'-th collection. Each collection (List/Map) may
/// have variable number of tuples, we only read children values of the 'tuple_idx'-th
/// tuple.
virtual Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple,
MemPool* pool) const WARN_UNUSED_RESULT = 0;
};
class OrcListReader : public OrcCollectionReader {
public:
OrcListReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner);
OrcListReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner);
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
int GetNumTuples(int row_idx) const override;
int GetChildBatchOffset(int row_idx) const override;
Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
const override WARN_UNUSED_RESULT;
private:
orc::ListVectorBatch* batch_ = nullptr;
const SlotDescriptor* pos_slot_desc_ = nullptr;
int array_start_ = -1;
int array_idx_ = -1;
int array_end_ = -1;
void CreateChildForSlot(const orc::Type* node, const SlotDescriptor* slot_desc);
/// Used for top level readers. Advance current position (row_idx_ and array_idx_)
/// to the first tuple inside next row.
void NextRow();
};
class OrcMapReader : public OrcCollectionReader {
public:
OrcMapReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc,
HdfsOrcScanner* scanner);
OrcMapReader(const orc::Type* node, const SlotDescriptor* slot_desc,
HdfsOrcScanner* scanner);
void UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) override;
Status TransferTuple(Tuple* tuple, MemPool* pool) override WARN_UNUSED_RESULT;
int GetNumTuples(int row_idx) const override;
int GetChildBatchOffset(int row_idx) const override;
Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
const override WARN_UNUSED_RESULT;
private:
orc::MapVectorBatch* batch_ = nullptr;
vector<OrcColumnReader*> key_readers_;
vector<OrcColumnReader*> value_readers_;
int array_offset_ = -1;
int array_end_ = -1;
void CreateChildForSlot(const orc::Type* orc_type, const SlotDescriptor* slot_desc);
/// Used for top level readers. Advance current position (row_idx_ and array_offset_)
/// to the first key/value pair in next row.
void NextRow();
};
}

View File

@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/orc-metadata-utils.h"
#include "util/debug-util.h"
#include "common/names.h"
namespace impala {
void OrcMetadataUtils::BuildSchemaPaths(const orc::Type& root, int num_partition_keys,
vector<SchemaPath>* paths) {
SchemaPath path;
paths->push_back(path);
DCHECK_EQ(root.getKind(), orc::TypeKind::STRUCT);
int num_columns = root.getSubtypeCount();
for (int i = 0; i < num_columns; ++i) {
path.push_back(i + num_partition_keys);
BuildSchemaPath(*root.getSubtype(i), &path, paths);
path.pop_back();
}
}
void OrcMetadataUtils::BuildSchemaPath(const orc::Type& node, SchemaPath* path,
vector<SchemaPath>* paths) {
DCHECK_EQ(paths->size(), node.getColumnId());
paths->push_back(*path);
if (node.getKind() == orc::TypeKind::STRUCT) {
int size = node.getSubtypeCount();
for (int i = 0; i < size; ++i) {
path->push_back(i);
const orc::Type* child = node.getSubtype(i);
BuildSchemaPath(*child, path, paths);
path->pop_back();
}
} else if (node.getKind() == orc::TypeKind::LIST) {
DCHECK_EQ(node.getSubtypeCount(), 1);
const orc::Type* child = node.getSubtype(0);
path->push_back(SchemaPathConstants::ARRAY_ITEM);
BuildSchemaPath(*child, path, paths);
path->pop_back();
} else if (node.getKind() == orc::TypeKind::MAP) {
DCHECK_EQ(node.getSubtypeCount(), 2);
const orc::Type* key_child = node.getSubtype(0);
const orc::Type* value_child = node.getSubtype(1);
path->push_back(SchemaPathConstants::MAP_KEY);
BuildSchemaPath(*key_child, path, paths);
(*path)[path->size() - 1] = SchemaPathConstants::MAP_VALUE;
BuildSchemaPath(*value_child, path, paths);
path->pop_back();
}
}
Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path,
const orc::Type** node, bool* pos_field, bool* missing_field) const {
const ColumnType* table_col_type = nullptr;
*node = root_;
*pos_field = false;
*missing_field = false;
for (int i = 0; i < col_path.size(); ++i) {
int table_idx = col_path[i];
int file_idx = table_idx;
if (i == 0) {
table_col_type = &tbl_desc_.col_descs()[table_idx].type();
// For top-level columns, the first index in a path includes the table's partition
// keys.
file_idx -= tbl_desc_.num_clustering_cols();
} else if (table_col_type->type == TYPE_ARRAY &&
table_idx == SchemaPathConstants::ARRAY_POS) {
// To materialize the positions, the ORC lib has to materialize the whole array
// column.
*pos_field = true;
break; // return *node as the ARRAY node
} else {
table_col_type = &table_col_type->children[table_idx];
}
if (file_idx >= (*node)->getSubtypeCount()) {
*missing_field = true;
return Status::OK();
}
*node = (*node)->getSubtype(file_idx);
if (table_col_type->type == TYPE_ARRAY) {
DCHECK_EQ(table_col_type->children.size(), 1);
if ((*node)->getKind() != orc::TypeKind::LIST) {
return Status(Substitute("File '$0' has an incompatible ORC schema for column "
"'$1', Column type: $2, ORC schema:\\n$3", filename_,
PrintSubPath(tbl_desc_, col_path, i), "array", (*node)->toString()));
}
} else if (table_col_type->type == TYPE_MAP) {
DCHECK_EQ(table_col_type->children.size(), 2);
if ((*node)->getKind() != orc::TypeKind::MAP) {
return Status(Substitute("File '$0' has an incompatible ORC schema for column "
"'$1', Column type: $2, ORC schema:\\n$3", filename_,
PrintSubPath(tbl_desc_, col_path, i), "map", (*node)->toString()));
}
} else if (table_col_type->type == TYPE_STRUCT) {
DCHECK_GT(table_col_type->children.size(), 0);
} else {
DCHECK(!table_col_type->IsComplexType());
DCHECK_EQ(i, col_path.size() - 1);
RETURN_IF_ERROR(ValidateType(*table_col_type, **node));
}
}
return Status::OK();
}
Status OrcSchemaResolver::ValidateType(const ColumnType& type,
const orc::Type& orc_type) const {
switch (orc_type.getKind()) {
case orc::TypeKind::BOOLEAN:
if (type.type == TYPE_BOOLEAN) return Status::OK();
break;
case orc::TypeKind::BYTE:
if (type.type == TYPE_TINYINT || type.type == TYPE_SMALLINT
|| type.type == TYPE_INT || type.type == TYPE_BIGINT) {
return Status::OK();
}
break;
case orc::TypeKind::SHORT:
if (type.type == TYPE_SMALLINT || type.type == TYPE_INT
|| type.type == TYPE_BIGINT) {
return Status::OK();
}
break;
case orc::TypeKind::INT:
if (type.type == TYPE_INT || type.type == TYPE_BIGINT) return Status::OK();
break;
case orc::TypeKind::LONG:
if (type.type == TYPE_BIGINT) return Status::OK();
break;
case orc::TypeKind::FLOAT:
case orc::TypeKind::DOUBLE:
if (type.type == TYPE_FLOAT || type.type == TYPE_DOUBLE) return Status::OK();
break;
case orc::TypeKind::STRING:
case orc::TypeKind::VARCHAR:
case orc::TypeKind::CHAR:
if (type.type == TYPE_STRING || type.type == TYPE_VARCHAR
|| type.type == TYPE_CHAR) {
return Status::OK();
}
break;
case orc::TypeKind::TIMESTAMP:
if (type.type == TYPE_TIMESTAMP) return Status::OK();
break;
case orc::TypeKind::DECIMAL: {
if (type.type != TYPE_DECIMAL || type.scale != orc_type.getScale()) break;
bool overflow = false;
int orc_precision = orc_type.getPrecision();
if (orc_precision == 0 || orc_precision > ColumnType::MAX_DECIMAL8_PRECISION) {
// For ORC decimals whose precision is larger than 18, its value can't fit into
// an int64 (10^19 > 2^63). So we should use int128 (16 bytes) for this case.
// The possible byte sizes for Impala decimals are 4, 8, 16.
// We mark it as overflow if the target byte size is not 16.
overflow = (type.GetByteSize() != 16);
} else if (orc_type.getPrecision() > ColumnType::MAX_DECIMAL4_PRECISION) {
// For ORC decimals whose precision <= 18 and > 9, int64 and int128 can fit them.
// We only mark it as overflow if the target byte size is 4.
overflow = (type.GetByteSize() == 4);
}
if (!overflow) return Status::OK();
return Status(Substitute(
"Column $0 in ORC file '$1' can't be truncated to table column $2",
orc_type.toString(), filename_, type.DebugString()));
}
default: break;
}
return Status(Substitute(
"Type mismatch: table column $0 is map to column $1 in ORC file '$2'",
type.DebugString(), orc_type.toString(), filename_));
}
}

View File

@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <orc/OrcFile.hh>
#include <queue>
#include "runtime/descriptors.h"
namespace impala {
/// Utils to build a map from each orc::Type id to a SchemaPath. The map will be used in
/// creating OrcColumnReaders.
class OrcMetadataUtils {
public:
static void BuildSchemaPaths(const orc::Type& root, int num_partition_keys,
std::vector<SchemaPath>* paths);
private:
static void BuildSchemaPath(const orc::Type& node, SchemaPath* path,
std::vector<SchemaPath>* paths);
};
/// Util class to resolve SchemaPaths of TupleDescriptors/SlotDescriptors into orc::Type.
class OrcSchemaResolver {
public:
OrcSchemaResolver(const HdfsTableDescriptor& tbl_desc, const orc::Type* root,
const char* filename) : tbl_desc_(tbl_desc), root_(root), filename_(filename) { }
/// Resolve SchemaPath into orc::Type (ORC column representation)
/// 'pos_field' is set to true if 'col_path' reference the index field of an array
/// column. '*node' will be the array node if 'pos_field' is set to true.
/// 'missing_field' is set to true if the column is missing in the ORC file.
Status ResolveColumn(const SchemaPath& col_path, const orc::Type** node,
bool* pos_field, bool* missing_field) const;
private:
const HdfsTableDescriptor& tbl_desc_;
const orc::Type* const root_;
const char* const filename_ = nullptr;
/// Validate whether the ColumnType is compatible with the orc type
Status ValidateType(const ColumnType& type, const orc::Type& orc_type) const
WARN_UNUSED_RESULT;
};
}

View File

@@ -62,6 +62,11 @@ static void DecompressLocation(const impala::THdfsTable& thrift_table,
namespace impala {
const int SchemaPathConstants::ARRAY_ITEM;
const int SchemaPathConstants::ARRAY_POS;
const int SchemaPathConstants::MAP_KEY;
const int SchemaPathConstants::MAP_VALUE;
const int RowDescriptor::INVALID_IDX;
const char* TupleDescriptor::LLVM_CLASS_NAME = "class.impala::TupleDescriptor";

View File

@@ -66,7 +66,7 @@ public enum HdfsFileFormat {
ORC("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
"org.apache.hadoop.hive.ql.io.orc.OrcSerde",
false, true),
true, true),
KUDU("org.apache.kudu.mapreduce.KuduTableInputFormat",
"org.apache.kudu.mapreduce.KuduTableOutputFormat",
"", false, false);

23
testdata/ComplexTypesTbl/README vendored Normal file
View File

@@ -0,0 +1,23 @@
The two Parquet files (nullable.parq and nonnullable_orc.parq) were generated
as testdata/data/schemas/nested/README stated.
The two ORC files (nullable.orc and nonnullable.orc) were generated by the orc-tools
which can convert JSON files into ORC format. However, we need to modify nullable.json
and nonnullable.json to meet the format it requires. The whole file should not be a array.
It should be JSON objects of each row joined by '\n'. Assume the JSON files are
nullable_orc.json and nonnullable_orc.json.
The ORC files can be regenerated by running the following commands in current directory:
wget https://search.maven.org/remotecontent?filepath=org/apache/orc/orc-tools/1.5.4/orc-tools-1.5.4-uber.jar \
-O orc-tools-1.5.4-uber.jar
java -jar orc-tools-1.5.4-uber.jar convert \
-s "struct<id:bigint,int_array:array<int>,int_array_Array:array<array<int>>,int_map:map<string,int>,int_Map_Array:array<map<string,int>>,nested_struct:struct<A:int,b:array<int>,C:struct<d:array<array<struct<E:int,F:string>>>>,g:map<string,struct<H:struct<i:array<double>>>>>>" \
-o nullable.orc \
nullable_orc.json
java -jar orc-tools-1.5.4-uber.jar convert \
-s "struct<ID:bigint,Int_Array:array<int>,int_array_array:array<array<int>>,Int_Map:map<string,int>,int_map_array:array<map<string,int>>,nested_Struct:struct<a:int,B:array<int>,c:struct<D:array<array<struct<e:int,f:string>>>>,G:map<string,struct<h:struct<i:array<double>>>>>>" \
-o nonnullable.orc \
nonnullable_orc.json

BIN
testdata/ComplexTypesTbl/nonnullable.orc vendored Normal file

Binary file not shown.

BIN
testdata/ComplexTypesTbl/nullable.orc vendored Normal file

Binary file not shown.

View File

@@ -577,8 +577,12 @@ if [ $SKIP_METADATA_LOAD -eq 0 ]; then
if [[ -n "$CM_HOST" ]]; then
LOAD_NESTED_ARGS="--cm-host $CM_HOST"
fi
run-step "Loading nested data" load-nested.log \
${IMPALA_HOME}/testdata/bin/load_nested.py ${LOAD_NESTED_ARGS:-}
run-step "Loading nested parquet data" load-nested.log \
${IMPALA_HOME}/testdata/bin/load_nested.py \
-t tpch_nested_parquet -f parquet/none ${LOAD_NESTED_ARGS:-}
run-step "Loading nested orc data" load-nested.log \
${IMPALA_HOME}/testdata/bin/load_nested.py \
-t tpch_nested_orc_def -f orc/def ${LOAD_NESTED_ARGS:-}
run-step "Loading auxiliary workloads" load-aux-workloads.log load-aux-workloads
run-step "Loading dependent tables" copy-and-load-dependent-tables.log \
copy-and-load-dependent-tables

View File

@@ -29,10 +29,39 @@ import tests.comparison.cli_options as cli_options
LOG = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])
# We use Hive to transform nested text tables into parquet/orc tables. To control the
# compression, the property keys differ from file formats (see COMPRESSION_KEYS_MAP).
# The property values also differ from short names used in Impala (e.g. snap, def).
# So we define COMPRESSION_VALUES_MAP for the mapping.
COMPRESSION_KEYS_MAP = {
"parquet": "parquet.compression",
"orc": "orc.compress"
}
COMPRESSION_VALUES_MAP = {
# Currently, only three codecs are supported in Hive for Parquet. See codes in
# org.apache.parquet.hadoop.metadata.CompressionCodecName (parquet-hadoop-bundle)
"parquet": {
"none": "SNAPPY",
"snap": "SNAPPY",
"gzip": "GZIP",
"lzo": "LZO"
},
# Currently, only three codecs are supported in Hive for ORC. See Hive codes in
# org.apache.orc.impl.WriterImpl#createCodec (in module hive-orc)
"orc": {
"none": "NONE",
"def": "ZLIB",
"snap": "SNAPPY"
}
}
# These vars are set after arg parsing.
cluster = None
source_db = None
target_db = None
file_format = None
compression_key = None
compression_value = None
chunks = None
def is_loaded():
@@ -58,6 +87,9 @@ def load():
sql_params = {
"source_db": source_db,
"target_db": target_db,
"file_format": file_format,
"compression_key": compression_key,
"compression_value": compression_value,
"chunks": chunks,
"warehouse_dir": cluster.hive.warehouse_dir}
@@ -240,17 +272,26 @@ def load():
LOCATION '{warehouse_dir}/{target_db}.db/tmp_supplier_string'"""\
.format(**sql_params))
# The part table doesn't have nesting.
# The part table doesn't have nesting. If it's a parquet table, we create it in Impala
LOG.info("Creating parts")
impala.execute("""
CREATE EXTERNAL TABLE part
STORED AS PARQUET
AS SELECT * FROM {source_db}.part""".format(**sql_params))
if file_format == "parquet":
impala.execute("""
CREATE EXTERNAL TABLE part
STORED AS PARQUET
AS SELECT * FROM {source_db}.part""".format(**sql_params))
cluster.hdfs.ensure_home_dir()
if file_format == "orc":
# For ORC format, we create the 'part' table by Hive
with cluster.hive.cursor(db_name=target_db) as hive:
hive.execute("""
CREATE TABLE part
STORED AS ORC
TBLPROPERTIES('{compression_key}'='{compression_value}')
AS SELECT * FROM {source_db}.part""".format(**sql_params))
# Hive is used to convert the data into parquet and drop all the temp tables.
# Hive is used to convert the data into parquet/orc and drop all the temp tables.
# The Hive SET values are necessary to prevent Impala remote reads of parquet files.
# These values are taken from http://blog.cloudera.com/blog/2014/12/the-impala-cookbook.
cluster.hdfs.ensure_home_dir()
with cluster.hive.cursor(db_name=target_db) as hive:
LOG.info("Converting temp tables")
for stmt in """
@@ -259,19 +300,19 @@ def load():
SET dfs.block.size=1073741824;
CREATE TABLE customer
STORED AS PARQUET
TBLPROPERTIES('parquet.compression'='SNAPPY')
STORED AS {file_format}
TBLPROPERTIES('{compression_key}'='{compression_value}')
AS SELECT * FROM tmp_customer;
CREATE TABLE region
STORED AS PARQUET
TBLPROPERTIES('parquet.compression'='SNAPPY')
STORED AS {file_format}
TBLPROPERTIES('{compression_key}'='{compression_value}')
AS SELECT * FROM tmp_region;
CREATE TABLE supplier
STORED AS PARQUET
TBLPROPERTIES('parquet.compression'='SNAPPY')
AS SELECT * FROM tmp_supplier;""".split(";"):
STORED AS {file_format}
TBLPROPERTIES('{compression_key}'='{compression_value}')
AS SELECT * FROM tmp_supplier;""".format(**sql_params).split(";"):
if not stmt.strip():
continue
LOG.info("Executing: {0}".format(stmt))
@@ -314,6 +355,7 @@ if __name__ == "__main__":
parser.add_argument("-s", "--source-db", default="tpch_parquet")
parser.add_argument("-t", "--target-db", default="tpch_nested_parquet")
parser.add_argument("-f", "--table-format", default="parquet/none") # can be "orc/def"
parser.add_argument("-c", "-p", "--chunks", type=int, default=1)
args = parser.parse_args()
@@ -323,6 +365,18 @@ if __name__ == "__main__":
cluster = cli_options.create_cluster(args)
source_db = args.source_db
target_db = args.target_db
file_format, compression_value = args.table_format.split("/")
# 'compression_value' is one of [none,def,gzip,bzip,snap,lzo]. We should translate it
# into values that can be set to Hive.
if file_format not in COMPRESSION_KEYS_MAP:
raise Exception("Nested types in file format %s are not supported" % file_format)
compression_key = COMPRESSION_KEYS_MAP[file_format]
if compression_value not in COMPRESSION_VALUES_MAP[file_format]:
raise Exception("Loading %s tables in %s compression is not supported by Hive. "
"Supported compressions: %s"
% (file_format, compression_value,
str(COMPRESSION_VALUES_MAP[file_format].keys())))
compression_value = COMPRESSION_VALUES_MAP[file_format][compression_value]
chunks = args.chunks
if is_loaded():

View File

@@ -699,7 +699,12 @@ nested_struct struct<a: int, b: array<int>, c: struct<d: array<array<struct<e: i
hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nullable.parq \
/test-warehouse/complextypestbl_parquet/ && \
hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nonnullable.parq \
/test-warehouse/complextypestbl_parquet/
/test-warehouse/complextypestbl_parquet/ && \
hadoop fs -mkdir -p /test-warehouse/complextypestbl_orc_def && \
hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nullable.orc \
/test-warehouse/complextypestbl_orc_def/ && \
hadoop fs -put -f ${IMPALA_HOME}/testdata/ComplexTypesTbl/nonnullable.orc \
/test-warehouse/complextypestbl_orc_def/
---- LOAD
====
---- DATASET

View File

@@ -71,7 +71,9 @@ table_name:complextypes_multifileformat, constraint:restrict_to, table_format:te
# TODO: Avro
table_name:complextypestbl, constraint:restrict_to, table_format:parquet/none/none
table_name:complextypestbl, constraint:restrict_to, table_format:orc/def/block
table_name:complextypestbl_medium, constraint:restrict_to, table_format:parquet/none/none
table_name:complextypestbl_medium, constraint:restrict_to, table_format:orc/def/block
table_name:alltypeserror, constraint:exclude, table_format:parquet/none/none
table_name:alltypeserrornonulls, constraint:exclude, table_format:parquet/none/none
1 # Table level constraints:
71 table_name:TblWithRaggedColumns, constraint:exclude, table_format:parquet/none/none table_name:unsupported_types, constraint:exclude, table_format:parquet/none/none
72 # the text_ tables are for testing test delimiters and escape chars in text files table_name:escapechartesttable, constraint:exclude, table_format:parquet/none/none
73 table_name:text_comma_backslash_newline, constraint:restrict_to, table_format:text/none/none table_name:TblWithRaggedColumns, constraint:exclude, table_format:parquet/none/none
74 # the text_ tables are for testing test delimiters and escape chars in text files
75 table_name:text_dollar_hash_pipe, constraint:restrict_to, table_format:text/none/none table_name:text_comma_backslash_newline, constraint:restrict_to, table_format:text/none/none
76 table_name:text_dollar_hash_pipe, constraint:restrict_to, table_format:text/none/none
77 table_name:text_thorn_ecirc_newline, constraint:restrict_to, table_format:text/none/none
78 table_name:bad_serde, constraint:restrict_to, table_format:text/none/none
79 table_name:rcfile_lazy_binary_serde, constraint:restrict_to, table_format:rc/none/none

View File

@@ -20,70 +20,57 @@ PLAN-ROOT SINK
predicates: !empty(t.a)
row-size=24B cardinality=unavailable
====
# Complex types are not supported on ORC.
# Scanning an unpartitioned ORC table with complex types plans ok.
select 1 from functional_orc_def.complextypes_fileformat t, t.a
---- PLAN
NotImplementedException: Scan of table 't' in format 'ORC' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
====
select s.f1 from functional_orc_def.complextypes_fileformat t, t.m
---- PLAN
NotImplementedException: Scan of table 't' in format 'ORC' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
====
# Complex types are not supported on ORC, however queries materializing
# only scalar type columns are allowed.
select id from functional_orc_def.complextypes_fileformat
---- PLAN
PLAN-ROOT SINK
|
00:SCAN HDFS [functional_orc_def.complextypes_fileformat]
partitions=1/1 files=1 size=621B
row-size=4B cardinality=unavailable
====
# Complex types are not supported on ORC but count(*) and similar
# queries should work.
select count(*) from functional_orc_def.complextypes_fileformat
---- PLAN
PLAN-ROOT SINK
01:SUBPLAN
| row-size=12B cardinality=unavailable
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=1
|--04:NESTED LOOP JOIN [CROSS JOIN]
| | row-size=12B cardinality=10
| |
| |--02:SINGULAR ROW SRC
| | row-size=12B cardinality=1
| |
| 03:UNNEST [t.a]
| row-size=0B cardinality=10
|
00:SCAN HDFS [functional_orc_def.complextypes_fileformat]
00:SCAN HDFS [functional_orc_def.complextypes_fileformat t]
partitions=1/1 files=1 size=621B
row-size=0B cardinality=unavailable
predicates: !empty(t.a)
row-size=12B cardinality=unavailable
====
# Complex types are not supported on Avro.
select s.f1 from functional_avro_snap.complextypes_fileformat t, t.a
---- PLAN
NotImplementedException: Scan of table 't' in format 'AVRO' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Complex types are not supported on text files.
select s.f1 from functional.complextypes_fileformat t, t.a
---- PLAN
NotImplementedException: Scan of table 't' in format 'TEXT' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Complex types are not supported on text files, even if no complex-typed
# columns are selected.
select 1 from functional.complextypes_fileformat
---- PLAN
NotImplementedException: Scan of table 'functional.complextypes_fileformat' in format 'TEXT' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Complex types are not supported on RC files.
select 1 from functional_rc_snap.complextypes_fileformat t, t.a
---- PLAN
NotImplementedException: Scan of table 't' in format 'RC_FILE' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
select s.f1 from functional_rc_snap.complextypes_fileformat t, t.m
---- PLAN
NotImplementedException: Scan of table 't' in format 'RC_FILE' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Complex types are not supported on RC files, however queries materializing
# only scalar type columns are allowed.
@@ -113,7 +100,7 @@ PLAN-ROOT SINK
select s.f1 from functional_seq_snap.complextypes_fileformat t, t.a
---- PLAN
NotImplementedException: Scan of table 't' in format 'SEQUENCE_FILE' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Queries referencing only scalar typed columns on sequence files
# are allowed.
@@ -147,14 +134,14 @@ PLAN-ROOT SINK
select id from functional_hbase.allcomplextypes t, t.int_array_col
---- PLAN
NotImplementedException: Scan of table 't.int_array_col' is not supported because 't' references a nested field/collection.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Scanning an HBase table with complex-types columns fails if a complex-typed
# column is selected.
select complex_struct_col.f1 from functional_hbase.allcomplextypes
---- PLAN
NotImplementedException: Scan of table 'functional_hbase.allcomplextypes.complex_struct_col.f1' is not supported because 'functional_hbase.allcomplextypes' references a nested field/collection.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# The complextypes_multifileformat has five partitions with different file formats:
# p=1 text
@@ -166,7 +153,7 @@ Complex types are supported for these file formats: PARQUET.
select 1 from functional.complextypes_multifileformat where p = 1
---- PLAN
NotImplementedException: Scan of partition 'p=1' in format 'TEXT' of table 'functional.complextypes_multifileformat' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Scanning a Parquet partition of a multi-format table with complex types plans ok.
select s.f1 from functional.complextypes_multifileformat t, t.a where p = 2
@@ -195,13 +182,13 @@ PLAN-ROOT SINK
select s.f1 from functional.complextypes_multifileformat t, t.a where p = 3
---- PLAN
NotImplementedException: Scan of partition 'p=3' in format 'AVRO' of table 't' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Scanning an RC file partition of a multi-format table with complex types fails.
select id from functional.complextypes_multifileformat t, t.a where p = 4
---- PLAN
NotImplementedException: Scan of partition 'p=4' in format 'RC_FILE' of table 't' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
Complex types are supported for these file formats: PARQUET, ORC.
====
# Complex types are not supported on RC files but count(*) and similar
# queries should work.
@@ -218,24 +205,26 @@ PLAN-ROOT SINK
partitions=1/5 files=1 size=128B
row-size=0B cardinality=unavailable
====
# Scanning an ORC file partition of a multi-format table with complex types fails.
# Scanning an ORC file partition of a multi-format table with complex types plans ok.
select id from functional.complextypes_multifileformat t, t.a where p = 5
---- PLAN
NotImplementedException: Scan of partition 'p=5' in format 'ORC' of table 't' is not supported because the table has a column 's' with a complex type 'STRUCT<f1:STRING,f2:INT>'.
Complex types are supported for these file formats: PARQUET.
====
# Complex types are not supported on ORC files but count(*) and similar
# queries should work.
select count(*) from functional.complextypes_multifileformat where p = 5
---- PLAN
PLAN-ROOT SINK
|
01:AGGREGATE [FINALIZE]
| output: count(*)
| row-size=8B cardinality=1
01:SUBPLAN
| row-size=16B cardinality=unavailable
|
00:SCAN HDFS [functional.complextypes_multifileformat]
|--04:NESTED LOOP JOIN [CROSS JOIN]
| | row-size=16B cardinality=10
| |
| |--02:SINGULAR ROW SRC
| | row-size=16B cardinality=1
| |
| 03:UNNEST [t.a]
| row-size=0B cardinality=10
|
00:SCAN HDFS [functional.complextypes_multifileformat t]
partition predicates: p = 5
partitions=1/5 files=1 size=128B
row-size=0B cardinality=unavailable
predicates: !empty(t.a)
row-size=16B cardinality=unavailable
====

View File

@@ -80,12 +80,12 @@ decimal
132842
====
---- QUERY
select d2 from mismatch_decimals
select d2 from mismatch_decimals where d6 = 1
---- TYPES
decimal
---- RESULTS
---- CATCH
It can't be truncated to table column DECIMAL(8,0) for column decimal(10,0) in ORC file
Column decimal(10,0) in ORC file '$NAMENODE/test-warehouse/decimal_tbl_orc_def/d6=1/000000_0' can't be truncated to table column DECIMAL(8,0)
====
---- QUERY
select d3 from mismatch_decimals

View File

@@ -1,12 +1,5 @@
====
---- QUERY
# Test maximally nested struct.
create external table $DATABASE.struct_tbl
like parquet '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/struct/file.parq'
stored as parquet
location '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/struct/'
====
---- QUERY
select f.
f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.
f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.
@@ -20,13 +13,6 @@ from $DATABASE.struct_tbl
int
====
---- QUERY
# Test maximally nested array.
create external table $DATABASE.int_array_tbl
like parquet '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/int_array/file.parq'
stored as parquet
location '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/int_array/'
====
---- QUERY
# Test absolute table ref executed with a single scan.
select * from $DATABASE.int_array_tbl.f.
item.item.item.item.item.item.item.item.item.item.
@@ -73,13 +59,6 @@ select * from $DATABASE.int_array_tbl.f t0,
int
====
---- QUERY
# Test maximally nested array of struct.
create external table $DATABASE.struct_array_tbl
like parquet '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/struct_array/file.parq'
stored as parquet
location '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/struct_array/'
====
---- QUERY
# Test absolute table ref executed with a single scan.
select * from $DATABASE.struct_array_tbl.f.
f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.
@@ -109,13 +88,6 @@ select * from $DATABASE.struct_array_tbl.f t0,
int
====
---- QUERY
# Test maximally nested map.
create external table $DATABASE.int_map_tbl
like parquet '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/int_map/file.parq'
stored as parquet
location '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/int_map/'
====
---- QUERY
# Test absolute table ref executed with a single scan.
select t.value from $DATABASE.int_map_tbl.f.
value.value.value.value.value.value.value.value.value.value.
@@ -162,13 +134,6 @@ select t98.value from $DATABASE.int_map_tbl.f t0,
int
====
---- QUERY
# Test maximally nested map of struct.
create external table $DATABASE.struct_map_tbl
like parquet '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/struct_map/file.parq'
stored as parquet
location '$FILESYSTEM_PREFIX/test-warehouse/$DATABASE.db/max_nesting_depth/struct_map/'
====
---- QUERY
# Test absolute table ref executed with a single scan.
select t.value from $DATABASE.struct_map_tbl.f.
f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.f0.

View File

@@ -1,8 +1,5 @@
====
---- QUERY
use tpch_nested_parquet
====
---- QUERY
# Basic subplan with an unnest + join.
select c_custkey, c_mktsegment, o_orderkey, o_orderdate
from customer c, c.c_orders o
@@ -255,7 +252,7 @@ bigint,string,bigint,decimal,decimal,string,string
---- QUERY
# Test a left outer join inside a subplan.
select count(okey), opriority
from tpch_nested_parquet.customer c,
from customer c,
(select ca.o_orderkey okey, ca.o_orderpriority opriority
from c.c_orders ca left outer join c.c_orders cb
on ca.o_orderkey = cb.o_orderkey) v
@@ -319,7 +316,7 @@ bigint,string,bigint,string
---- QUERY
# Test a right outer join inside a subplan.
select count(okey), opriority
from tpch_nested_parquet.customer c,
from customer c,
(select ca.o_orderkey okey, ca.o_orderpriority opriority
from c.c_orders ca right outer join c.c_orders cb
on ca.o_orderkey = cb.o_orderkey
@@ -523,7 +520,7 @@ bigint,string,bigint,decimal,string
---- QUERY
# Test uncorrelated NOT IN subquery with a relative table ref.
select c_custkey, c_mktsegment, o_orderkey, o_totalprice, o_orderdate
from tpch_nested_parquet.customer c, c.c_orders o
from customer c, c.c_orders o
where c_custkey < 10
and o_orderdate like "1992%"
and cast(o_orderdate as timestamp) + interval 13 days not in
@@ -597,7 +594,7 @@ bigint, bigint
---- QUERY
# IMPALA-3678: Union in a subplan - passthrough should be disabled.
select count(c.c_custkey), count(v.tot_price)
from tpch_nested_parquet.customer c, (
from customer c, (
select sum(o_totalprice) tot_price from c.c_orders
union
select sum(o_totalprice) tot_price from c.c_orders
@@ -610,7 +607,7 @@ bigint,bigint
---- QUERY
# IMPALA-5363: Reset probe_batch_ after reaching limit.
# Query with a partitioned hash join inside a subplan
select count(*) FROM tpch_nested_parquet.customer c, (SELECT ca.o_orderkey okey,
select count(*) FROM customer c, (SELECT ca.o_orderkey okey,
ca.o_orderpriority opriority FROM c.c_orders ca, c.c_orders cb
WHERE ca.o_orderkey = cb.o_orderkey limit 2) v limit 51
---- RESULTS
@@ -621,7 +618,7 @@ bigint
---- QUERY
# IMPALA-5363: Reset probe_batch_ after reaching limit.
# Query with a Nested loop join inside a subplan
select count(*) FROM tpch_nested_parquet.customer c, (SELECT ca.o_orderkey okey,
select count(*) FROM customer c, (SELECT ca.o_orderkey okey,
ca.o_orderpriority opriority FROM c.c_orders ca, c.c_orders cb
WHERE ca.o_orderkey < cb.o_orderkey limit 2) v limit 51
---- RESULTS
@@ -633,7 +630,7 @@ bigint
# IMPALA-5438: Union with constant exprs in a subplan. The 'c_custkey % 100' was chosen
# to have all impalads produce results to make sure the constant exprs in the union are
# evaluated regardless of which fragment instance they are in.
select c_custkey, order_cnt, union_cnt from tpch_nested_parquet.customer c,
select c_custkey, order_cnt, union_cnt from customer c,
(select count(o_orderkey) order_cnt from c.c_orders) v,
(select count(o_orderkey) union_cnt from (
select o_orderkey from c.c_orders
@@ -647,7 +644,7 @@ bigint,bigint,bigint
---- QUERY
# IMPALA-2368: Resetting nested subplans works correctly with an analytic sort.
select count(o_orderkey)
from tpch_nested_parquet.customer c
from customer c
inner join c.c_orders o
where c_custkey < 10 and c_custkey in
(select lead(l.l_linenumber) over (partition by l.l_shipdate order by l.l_linenumber)

View File

@@ -1,8 +1,5 @@
====
---- QUERY
use tpch_nested_parquet
====
---- QUERY
# IMPALA-3652: test limit on a hash join in a subplan where resources need to be
# transfered in Reset()
select count(*)

View File

@@ -1,14 +1,11 @@
====
---- QUERY
use tpch_nested_parquet
====
---- QUERY
# IMPALA-2376: run scan that constructs large collection and set memory limit low enough
# to get the below query to consistently fail when allocating a large collection. Set
# num_nodes to 1 in the python test and mt_dop to 1 here in order to make the query as
# deterministic as possible. mem_limit is tuned for a 3-node HDFS minicluster.
set buffer_pool_limit=24m;
set mem_limit=35m;
set mem_limit=28m;
set mt_dop=1;
select max(cnt1), max(cnt2), max(cnt3), max(cnt4), max(cnt5)
from customer c,
@@ -18,5 +15,5 @@ from customer c,
---- TYPES
BIGINT
---- CATCH
row_regex: .*Memory limit exceeded: Failed to allocate [0-9]+ bytes for collection 'tpch_nested_parquet.customer.c_orders.item.o_lineitems'.*
row_regex: .*Memory limit exceeded: Failed to allocate [0-9]+ bytes for collection 'tpch_nested_.*.customer.c_orders.item.o_lineitems'.*
====

View File

@@ -1,15 +1,12 @@
====
---- QUERY
use tpch_nested_parquet
====
---- QUERY
# IMPALA-2473: Scan query with large row size leading to oversized batches.
# Should not OOM given single scanner thread and sufficient memory limit.
# mem_limit is tuned for a 3-node HDFS minicluster.
set num_scanner_threads=1;
set mem_limit=1g;
select *
from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
from customer c, c.c_orders o, o.o_lineitems
order by l_partkey desc, l_suppkey desc, l_linenumber desc, c_custkey
limit 5
---- RESULTS
@@ -28,7 +25,7 @@ bigint,string,string,smallint,string,decimal,string,string,bigint,string,decimal
set num_scanner_threads=1;
set mem_limit=500m;
select *
from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
from customer c, c.c_orders o, o.o_lineitems
where c_phone='20-968-632-1388' and l_partkey = 127499
---- RESULTS
53722,'Customer#000053722','p2hQ7009dQSEuRdE31HtjVv9idN2rXBHdTx4JRk',10,'20-968-632-1388',4153.75,'HOUSEHOLD','l foxes are ideas. final deposits boost blithely. carefully bo',10277,'F',101242.81,'1994-04-08','3-MEDIUM','Clerk#000000655',0,'counts sleep around the special',127499,2524,1,12.00,18317.88,0.07,0.07,'A','F','1994-07-27','1994-05-22','1994-08-11','NONE','RAIL',' according to the furiously silent th'

View File

@@ -1,10 +1,7 @@
====
---- QUERY
use tpch_nested_parquet
====
---- QUERY
select count(*)
from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems i
from customer c, c.c_orders o, o.o_lineitems i
where c_custkey in (1, 2) and o_orderkey in (4808192, 1374019)
---- RESULTS
7
@@ -13,7 +10,7 @@ bigint
====
---- QUERY
select count(*)
from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems i
from customer c, c.c_orders o, o.o_lineitems i
---- RESULTS
6001215
---- TYPES
@@ -21,7 +18,7 @@ bigint
====
---- QUERY
select count(l_linenumber)
from tpch_nested_parquet.customer.c_orders.o_lineitems i
from customer.c_orders.o_lineitems i
---- RESULTS
6001215
---- TYPES
@@ -29,7 +26,7 @@ bigint
====
---- QUERY
select count(*)
from tpch_nested_parquet.customer.c_orders.o_lineitems i
from customer.c_orders.o_lineitems i
---- RESULTS
6001215
---- TYPES
@@ -61,7 +58,7 @@ order by c_custkey
bigint,bigint
====
---- QUERY
select c_custkey from tpch_nested_parquet.customer c left anti join c.c_orders
select c_custkey from customer c left anti join c.c_orders
where c_custkey < 10 order by c_custkey
---- RESULTS
3
@@ -73,7 +70,7 @@ bigint
---- QUERY
# Exercise TOP-N node with array data
select c_custkey, c_name, o_orderkey, o_orderpriority
from tpch_nested_parquet.customer c, c.c_orders o
from customer c, c.c_orders o
where c_custkey < 100
order by o_orderpriority, o_orderkey
limit 10;
@@ -96,8 +93,8 @@ bigint,string,bigint,string
# In this query no slots of the supplier.s_partssupps array are materialized, so there
# are NULL tuples once the array is unnested.
SELECT t2.s_name
FROM tpch_nested_parquet.customer t1
INNER JOIN tpch_nested_parquet.supplier t2
FROM customer t1
INNER JOIN supplier t2
ON t2.s_phone = t1.c_phone AND t2.s_acctbal = t1.c_acctbal,
t2.s_partsupps t3
WHERE t2.s_suppkey > t2.s_nationkey;
@@ -150,7 +147,7 @@ BIGINT,BIGINT
# generate many rows. This test stresses the order by and should not crash.
SELECT o_orderkey, r FROM
(SELECT o2.o_orderkey, COUNT(o1.pos) OVER (ORDER BY o2.o_orderkey DESC) r
FROM tpch_nested_parquet.customer c
FROM customer c
JOIN c.c_orders o1
JOIN c.c_orders o2 ON (o1.pos = o2.pos)) v
ORDER BY o_orderkey LIMIT 1
@@ -164,7 +161,7 @@ BIGINT,BIGINT
# build side of a nested loop join. Reproduces a memory transfer bug triggered by empty
# row batches in the build side of the join.
select straight_join c_custkey, cnt1
from tpch_nested_parquet.customer c,
from customer c,
(select count(*) cnt1 from c.c_orders) v
where cnt1 = 1
order by c_custkey

View File

@@ -1,2 +1,3 @@
# Manually created file.
file_format:parquet, dataset:tpch_nested, compression_codec: none, compression_type: none
file_format: parquet, dataset: tpch_nested, compression_codec: none, compression_type: none
file_format: orc, dataset: tpch_nested, compression_codec: def, compression_type: block
1 # Manually created file.
2 file_format:parquet, dataset:tpch_nested, compression_codec: none, compression_type: none file_format: parquet, dataset: tpch_nested, compression_codec: none, compression_type: none
3 file_format: orc, dataset: tpch_nested, compression_codec: def, compression_type: block

View File

@@ -1,4 +1,4 @@
file_format: parquet
file_format: parquet,orc
dataset: tpch_nested
compression_codec: none
compression_type: none
1 file_format: parquet file_format: parquet,orc
2 dataset: tpch_nested dataset: tpch_nested
3 compression_codec: none compression_codec: none
4 compression_type: none compression_type: none

View File

@@ -1,2 +1,3 @@
# Generated File.
file_format: parquet, dataset: tpch_nested, compression_codec: none, compression_type: none
file_format: orc, dataset: tpch_nested, compression_codec: def, compression_type: block
1 # Generated File.
2 file_format: parquet, dataset: tpch_nested, compression_codec: none, compression_type: none
3 file_format: orc, dataset: tpch_nested, compression_codec: def, compression_type: block

View File

@@ -1,2 +1,3 @@
# Generated File.
file_format: parquet, dataset: tpch_parquet, compression_codec: none, compression_type: none
file_format: orc, dataset: tpch_nested, compression_codec: def, compression_type: block
1 # Generated File.
2 file_format: parquet, dataset: tpch_parquet, compression_codec: none, compression_type: none
3 file_format: orc, dataset: tpch_nested, compression_codec: def, compression_type: block

View File

@@ -19,6 +19,7 @@
import os
from copy import deepcopy
import pytest
from subprocess import check_call
from pytest import skip
@@ -45,7 +46,7 @@ class TestNestedTypes(ImpalaTestSuite):
def add_test_dimensions(cls):
super(TestNestedTypes, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
v.get_value('table_format').file_format in ['parquet', 'orc'])
def test_scanner_basic(self, vector):
"""Queries that do not materialize arrays."""
@@ -74,7 +75,9 @@ class TestNestedTypes(ImpalaTestSuite):
def test_subplan(self, vector):
"""Test subplans with various exec nodes inside it."""
self.run_test_case('QueryTest/nested-types-subplan', vector)
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-subplan', vector,
use_db='tpch_nested' + db_suffix)
def test_subplan_single_node(self, vector):
"""Test subplans with various exec nodes inside it and num_nodes=1."""
@@ -84,22 +87,30 @@ class TestNestedTypes(ImpalaTestSuite):
def test_with_clause(self, vector):
"""Queries using nested types and with WITH clause."""
self.run_test_case('QueryTest/nested-types-with-clause', vector)
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-with-clause', vector,
use_db='tpch_nested' + db_suffix)
def test_tpch(self, vector):
"""Queries over the larger nested TPCH dataset."""
self.run_test_case('QueryTest/nested-types-tpch', vector)
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch', vector,
use_db='tpch_nested' + db_suffix)
def test_tpch_limit(self, vector):
"""Queries over the larger nested TPCH dataset with limits in their subplan."""
vector.get_value('exec_option')['batch_size'] = 10
self.run_test_case('QueryTest/nested-types-tpch-limit', vector)
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch-limit', vector,
use_db='tpch_nested' + db_suffix)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
def test_tpch_mem_limit(self, vector):
"""Queries over the larger nested TPCH dataset with memory limits tuned for
a 3-node HDFS minicluster."""
self.run_test_case('QueryTest/nested-types-tpch-mem-limit', vector)
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch-mem-limit', vector,
use_db='tpch_nested' + db_suffix)
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
def test_tpch_mem_limit_single_node(self, vector):
@@ -107,11 +118,16 @@ class TestNestedTypes(ImpalaTestSuite):
a 3-node HDFS minicluster with num_nodes=1."""
new_vector = deepcopy(vector)
new_vector.get_value('exec_option')['num_nodes'] = 1
self.run_test_case('QueryTest/nested-types-tpch-mem-limit-single-node', new_vector)
db_suffix = vector.get_value('table_format').db_suffix()
self.run_test_case('QueryTest/nested-types-tpch-mem-limit-single-node',
new_vector, use_db='tpch_nested' + db_suffix)
@SkipIfEC.fix_later
def test_parquet_stats(self, vector):
"""Queries that test evaluation of Parquet row group statistics."""
if vector.get_value('table_format').file_format == 'orc':
pytest.skip('Predicate push down on ORC stripe statistics is not supported' +
'(IMPALA-6505)')
self.run_test_case('QueryTest/nested-types-parquet-stats', vector)
@SkipIfIsilon.hive
@@ -134,6 +150,9 @@ class TestNestedTypes(ImpalaTestSuite):
"""IMPALA-6370: Test that a partitioned table with nested types can be scanned."""
table = "complextypes_partitioned"
db_table = "{0}.{1}".format(unique_database, table)
table_format_info = vector.get_value('table_format') # type: TableFormatInfo
file_format = table_format_info.file_format
db_suffix = table_format_info.db_suffix()
self.client.execute("""
CREATE EXTERNAL TABLE {0} (
id BIGINT,
@@ -150,12 +169,12 @@ class TestNestedTypes(ImpalaTestSuite):
PARTITIONED BY (
part int
)
STORED AS PARQUET""".format(db_table))
STORED AS {1}""".format(db_table, file_format))
# Add multiple partitions pointing to the complextypes_tbl data.
for partition in [1, 2]:
self.client.execute("ALTER TABLE {0} ADD PARTITION(part={1}) LOCATION '{2}'".format(
db_table, partition,
self._get_table_location("functional_parquet.complextypestbl", vector)))
self._get_table_location("functional%s.complextypestbl" % db_suffix, vector)))
self.run_test_case('QueryTest/nested-types-basic-partitioned', vector,
unique_database)
@@ -572,6 +591,8 @@ class TestParquetArrayEncodings(ImpalaTestSuite):
class TestMaxNestingDepth(ImpalaTestSuite):
# Should be kept in sync with the FE's Type.MAX_NESTING_DEPTH
MAX_NESTING_DEPTH = 100
TABLES = ['struct', 'int_array', 'struct_array', 'int_map', 'struct_map']
TEMP_TABLE_SUFFIX = '_parquet'
@classmethod
def get_workload(self):
@@ -581,15 +602,44 @@ class TestMaxNestingDepth(ImpalaTestSuite):
def add_test_dimensions(cls):
super(TestMaxNestingDepth, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format == 'parquet')
v.get_value('table_format').file_format in ['parquet', 'orc'])
def test_max_nesting_depth(self, vector, unique_database):
"""Tests that Impala can scan Parquet files having complex types of
"""Tests that Impala can scan Parquet and ORC files having complex types of
the maximum nesting depth."""
file_format = vector.get_value('table_format').file_format
if file_format == 'parquet':
self.__create_parquet_tables(unique_database)
elif file_format == 'orc':
self.__create_orc_tables(unique_database)
self.run_test_case('QueryTest/max-nesting-depth', vector, unique_database)
def __create_parquet_tables(self, unique_database, as_target=True):
"""Create Parquet tables from files. If 'as_target' is False, the Parquet tables will
be used to create ORC tables, so we add a suffix in the table names."""
check_call(["hdfs", "dfs", "-copyFromLocal",
"%s/testdata/max_nesting_depth" % os.environ['IMPALA_HOME'],
"%s/%s.db/" % (WAREHOUSE, unique_database)], shell=False)
self.run_test_case('QueryTest/max-nesting-depth', vector, unique_database)
tbl_suffix = '' if as_target else self.TEMP_TABLE_SUFFIX
for tbl in self.TABLES:
tbl_name = "%s.%s_tbl%s" % (unique_database, tbl, tbl_suffix)
tbl_location = "%s/%s.db/max_nesting_depth/%s/" % (WAREHOUSE, unique_database, tbl)
create_table = "CREATE EXTERNAL TABLE %s LIKE PARQUET '%s' STORED AS PARQUET" \
" location '%s'" % (tbl_name, tbl_location + 'file.parq', tbl_location)
self.client.execute(create_table)
def __create_orc_tables(self, unique_database):
# Creating ORC tables from ORC files (IMPALA-8046) has not been supported.
# We create the Parquet tables first and then transform them into ORC tables.
self.__create_parquet_tables(unique_database, False)
for tbl in self.TABLES:
tbl_name = "%s.%s_tbl" % (unique_database, tbl)
from_tbl_name = tbl_name + self.TEMP_TABLE_SUFFIX
create_table = "CREATE TABLE %s LIKE %s STORED AS ORC" % (tbl_name, from_tbl_name)
insert_table = "INSERT INTO %s SELECT * FROM %s" % (tbl_name, from_tbl_name)
self.run_stmt_in_hive(create_table)
self.run_stmt_in_hive(insert_table)
self.client.execute("INVALIDATE METADATA %s" % tbl_name)
@SkipIfIsilon.hive
@SkipIfS3.hive
@@ -602,8 +652,9 @@ class TestMaxNestingDepth(ImpalaTestSuite):
# Type with a nesting depth of MAX_NESTING_DEPTH + 1
type_sql = ("array<" * self.MAX_NESTING_DEPTH) + "int" +\
(">" * self.MAX_NESTING_DEPTH)
create_table_sql = "CREATE TABLE %s.above_max_depth (f %s) STORED AS PARQUET" %\
(unique_database, type_sql)
file_format = vector.get_value('table_format').file_format
create_table_sql = "CREATE TABLE %s.above_max_depth (f %s) STORED AS %s" %\
(unique_database, type_sql, file_format)
self.run_stmt_in_hive(create_table_sql)
self.client.execute("invalidate metadata %s.above_max_depth" % unique_database)
try:

View File

@@ -29,9 +29,9 @@ class TestTpchNestedQuery(ImpalaTestSuite):
def add_test_dimensions(cls):
super(TestTpchNestedQuery, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
# The nested tpch data is currently only available in parquet.
# The nested tpch data is currently only available in parquet and orc.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format in ['parquet'])
v.get_value('table_format').file_format in ['parquet', 'orc'])
def test_tpch_q1(self, vector):
self.run_test_case(self.get_workload() + '-q1', vector)