From dce68e6a3bc610e050f23eae62356ccb9cd712ec Mon Sep 17 00:00:00 2001 From: Tamas Mate Date: Mon, 21 Aug 2023 14:28:37 +0200 Subject: [PATCH] IMPALA-11996: Scanner change for Iceberg metadata querying This commit adds a scan node for querying Iceberg metadata tables. The scan node creates a Java scanner object that creates and scans the metadata table. The scanner uses the Iceberg API to scan the table after that the scan node fetches the rows one by one and materialises them into RowBatches. The Iceberg row reader on the backend does the translation between Iceberg and Impala types. There is only one fragment created to query the Iceberg metadata table which is supposed to be executed on the coordinator node that already has the Iceberg table loaded. This way there is no need for further table loading on the executor side. This change will not cover nested column types, these slots are set to NULL, it will be done in IMPALA-12205. Testing: - Added e2e tests for querying metadata tables - Updated planner tests Performance testing: Created a table and inserted ~5500 rows one by one, this generated ~270000 ALL_MANIFESTS metadata table records. This table is quite wide and has a String column as well. I only mention count(*) test on ALL_MANIFESTS, because every row is materialized in every scenario currently: - Cold cache: 15.76s - IcebergApiScanTime: 124.407ms - MaterializeTupleTime: 8s368ms - Warm cache: 7.56s - IcebergApiScanTime: 3.646ms - MaterializeTupleTime: 7s477ms Change-Id: I0e943cecd77f5ef7af7cd07e2b596f2c5b4331e7 Reviewed-on: http://gerrit.cloudera.org:8080/20010 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- be/CMakeLists.txt | 2 + be/src/exec/CMakeLists.txt | 1 + be/src/exec/exec-node.cc | 4 + be/src/exec/iceberg-metadata/CMakeLists.txt | 29 ++ .../iceberg-metadata-scan-node.cc | 180 +++++++ .../iceberg-metadata-scan-node.h | 126 +++++ .../iceberg-metadata/iceberg-row-reader.cc | 166 ++++++ .../iceberg-metadata/iceberg-row-reader.h | 87 ++++ be/src/scheduling/scheduler.cc | 2 +- be/src/service/frontend.cc | 6 + be/src/service/frontend.h | 5 + be/src/service/impalad-main.cc | 4 + be/src/util/jni-util.cc | 14 + be/src/util/jni-util.h | 30 ++ common/thrift/PlanNodes.thrift | 35 +- .../org/apache/impala/analysis/Analyzer.java | 2 +- .../analysis/IcebergMetadataTableRef.java | 10 + .../java/org/apache/impala/analysis/Path.java | 4 + .../catalog/iceberg/IcebergMetadataTable.java | 12 +- .../impala/planner/DistributedPlanner.java | 15 +- .../planner/IcebergMetadataScanNode.java | 32 +- .../impala/planner/IcebergScanPlanner.java | 4 +- .../impala/planner/SingleNodePlanner.java | 12 +- .../org/apache/impala/service/Frontend.java | 8 - .../apache/impala/service/JniFrontend.java | 11 + .../impala/util/IcebergMetadataScanner.java | 115 +++++ .../functional/functional_schema_template.sql | 17 + .../functional/schema_constraints.csv | 1 + .../iceberg-metadata-table-scan.test | 117 +++-- .../QueryTest/iceberg-metadata-tables.test | 484 +++++++++++++++--- tests/authorization/test_ranger.py | 38 ++ tests/query_test/test_iceberg.py | 13 +- 32 files changed, 1419 insertions(+), 167 deletions(-) create mode 100644 be/src/exec/iceberg-metadata/CMakeLists.txt create mode 100644 be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc create mode 100644 be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h create mode 100644 be/src/exec/iceberg-metadata/iceberg-row-reader.cc create mode 100644 be/src/exec/iceberg-metadata/iceberg-row-reader.h create mode 100644 fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 8c4a4cf0b..4532eb9e9 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -495,6 +495,7 @@ set (IMPALA_LIBS ExecRcfile ExecSequence ExecText + ExecIcebergMetadata Exprs ExprsIr GlobalFlags @@ -594,6 +595,7 @@ if (BUILD_SHARED_LIBS) ExecRcfile ExecSequence ExecText + ExecIcebergMetadata CodeGen Exprs Rpc diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 12e2534e5..3f4fb4c87 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -24,6 +24,7 @@ add_subdirectory(parquet) add_subdirectory(rcfile) add_subdirectory(sequence) add_subdirectory(text) +add_subdirectory(iceberg-metadata) # where to put generated libraries set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec") diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 7779cf2c7..745d9d51c 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -38,6 +38,7 @@ #include "exec/hdfs-scan-node-mt.h" #include "exec/hdfs-scan-node.h" #include "exec/iceberg-delete-node.h" +#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h" #include "exec/kudu/kudu-scan-node-mt.h" #include "exec/kudu/kudu-scan-node.h" #include "exec/kudu/kudu-util.h" @@ -226,6 +227,9 @@ Status PlanNode::CreatePlanNode( case TPlanNodeType::ICEBERG_DELETE_NODE: *node = pool->Add(new IcebergDeletePlanNode()); break; + case TPlanNodeType::ICEBERG_METADATA_SCAN_NODE: + *node = pool->Add(new IcebergMetadataScanPlanNode()); + break; default: map::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); diff --git a/be/src/exec/iceberg-metadata/CMakeLists.txt b/be/src/exec/iceberg-metadata/CMakeLists.txt new file mode 100644 index 000000000..ea63de71a --- /dev/null +++ b/be/src/exec/iceberg-metadata/CMakeLists.txt @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/iceberg-metadata") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/iceberg-metadata") + +add_library(ExecIcebergMetadata + iceberg-metadata-scan-node.cc + iceberg-row-reader.cc +) + +add_dependencies(ExecIcebergMetadata gen-deps) diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc new file mode 100644 index 000000000..1e1ba76a5 --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h" +#include "exec/exec-node.inline.h" +#include "runtime/exec-env.h" +#include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" +#include "service/frontend.h" +#include "util/jni-util.h" + +using namespace impala; + +Status IcebergMetadataScanPlanNode::CreateExecNode( + RuntimeState* state, ExecNode** node) const { + ObjectPool* pool = state->obj_pool(); + *node = pool->Add(new IcebergMetadataScanNode(pool, *this, state->desc_tbl())); + return Status::OK(); +} + +IcebergMetadataScanNode::IcebergMetadataScanNode(ObjectPool* pool, + const IcebergMetadataScanPlanNode& pnode, const DescriptorTbl& descs) + : ScanNode(pool, pnode, descs), + tuple_id_(pnode.tnode_->iceberg_scan_metadata_node.tuple_id), + table_name_(pnode.tnode_->iceberg_scan_metadata_node.table_name), + metadata_table_name_(pnode.tnode_->iceberg_scan_metadata_node.metadata_table_name) {} + +Status IcebergMetadataScanNode::InitJNI() { + DCHECK(impala_iceberg_metadata_scanner_cl_ == nullptr) << "InitJNI() already called!"; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + // Global class references: + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "org/apache/impala/util/IcebergMetadataScanner", + &impala_iceberg_metadata_scanner_cl_)); + // Method ids: + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "", "(Lorg/apache/impala/catalog/FeIcebergTable;Ljava/lang/String;)V", + &iceberg_metadata_scanner_ctor_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "ScanMetadataTable", "()V", &scan_metadata_table_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "GetNext", "()Lorg/apache/iceberg/StructLike;", &get_next_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "GetAccessor", "(I)Lorg/apache/iceberg/Accessor;", &get_accessor_)); + return Status::OK(); +} + +Status IcebergMetadataScanNode::Prepare(RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::Prepare(state)); + scan_prepare_timer_ = ADD_TIMER(runtime_profile(), "ScanPrepareTime"); + iceberg_api_scan_timer_ = ADD_TIMER(runtime_profile(), "IcebergApiScanTime"); + SCOPED_TIMER(scan_prepare_timer_); + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); + if (tuple_desc_ == nullptr) { + return Status("Failed to get tuple descriptor, tuple id: " + + std::to_string(tuple_id_)); + } + // Get the FeTable object from the Frontend + jobject jtable; + RETURN_IF_ERROR(GetCatalogTable(&jtable)); + // Create the Java Scanner object and scan the table + jstring jstr_metadata_table_name = env->NewStringUTF(metadata_table_name_.c_str()); + jobject jmetadata_scanner = env->NewObject(impala_iceberg_metadata_scanner_cl_, + iceberg_metadata_scanner_ctor_, jtable, jstr_metadata_table_name); + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jmetadata_scanner, &jmetadata_scanner_)); + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(ScanMetadataTable()); + // Create field accessors + for (SlotDescriptor* slot_desc: tuple_desc_->slots()) { + jobject accessor_for_field = env->CallObjectMethod(jmetadata_scanner_, + get_accessor_, slot_desc->col_pos()); + RETURN_ERROR_IF_EXC(env); + jobject accessor_for_field_global_ref; + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, accessor_for_field, + &accessor_for_field_global_ref)); + jaccessors_[slot_desc->col_pos()] = accessor_for_field_global_ref; + } + return Status::OK(); +} + +Status IcebergMetadataScanNode::Open(RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::Open(state)); + iceberg_row_reader_.reset(new IcebergRowReader(tuple_desc_, jaccessors_)); + return Status::OK(); +} + +Status IcebergMetadataScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, + bool* eos) { + RETURN_IF_CANCELLED(state); + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + SCOPED_TIMER(materialize_tuple_timer()); + // Allocate buffer for RowBatch and init the tuple + uint8_t* tuple_buffer; + int64_t tuple_buffer_size; + RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, + &tuple_buffer)); + Tuple* tuple = reinterpret_cast(tuple_buffer); + tuple->Init(tuple_buffer_size); + while (!row_batch->AtCapacity()) { + int row_idx = row_batch->AddRow(); + TupleRow* tuple_row = row_batch->GetRow(row_idx); + tuple_row->SetTuple(0, tuple); + // Get the next row from 'org.apache.impala.util.IcebergMetadataScanner' + jobject struct_like_row = env->CallObjectMethod(jmetadata_scanner_, get_next_); + RETURN_ERROR_IF_EXC(env); + // When 'struct_like_row' is null, there are no more rows to read + if (struct_like_row == nullptr) { + *eos = true; + return Status::OK(); + } + // Translate a StructLikeRow from Iceberg to Tuple + RETURN_IF_ERROR(iceberg_row_reader_->MaterializeRow(env, struct_like_row, tuple, + row_batch->tuple_data_pool())); + env->DeleteLocalRef(struct_like_row); + RETURN_ERROR_IF_EXC(env); + COUNTER_ADD(rows_read_counter(), 1); + + // Evaluate conjuncts on this tuple row + if (ExecNode::EvalConjuncts(conjunct_evals().data(), + conjunct_evals().size(), tuple_row)) { + row_batch->CommitLastRow(); + tuple = reinterpret_cast( + reinterpret_cast(tuple) + tuple_desc_->byte_size()); + } else { + // Reset the null bits, everyhing else will be overwritten + Tuple::ClearNullBits(tuple, tuple_desc_->null_bytes_offset(), + tuple_desc_->num_null_bytes()); + } + } + return Status::OK(); +} + +void IcebergMetadataScanNode::Close(RuntimeState* state) { + if (is_closed()) return; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env != nullptr) { + // Close global references + if (jmetadata_scanner_ != nullptr) env->DeleteGlobalRef(jmetadata_scanner_); + for (auto accessor : jaccessors_) { + if (accessor.second != nullptr) env->DeleteGlobalRef(accessor.second); + } + } else { + LOG(ERROR) << "Couldn't get JNIEnv, unable to release Global JNI references"; + } + ScanNode::Close(state); +} + +Status IcebergMetadataScanNode::GetCatalogTable(jobject* jtable) { + Frontend* fe = ExecEnv::GetInstance()->frontend(); + RETURN_IF_ERROR(fe->GetCatalogTable(table_name_, jtable)); + return Status::OK(); +} + +Status IcebergMetadataScanNode::ScanMetadataTable() { + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + SCOPED_TIMER(iceberg_api_scan_timer_); + env->CallObjectMethod(jmetadata_scanner_, scan_metadata_table_); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h new file mode 100644 index 000000000..c994c7682 --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/iceberg-metadata/iceberg-row-reader.h" +#include "exec/scan-node.h" + +#include + +namespace impala { + +class ExecNode; +class IcebergRowReader; +class RuntimeState; +class Status; + +/// Scan node for an Iceberg metadata table. +/// Iceberg API provides predefined metadata tables, these tables can be scanned through +/// the Iceberg API just like any other regular Iceberg tables. Although, Impala utilizes +/// its Parquet scanner to scan Iceberg data, due to the virtual nature of the metadata +/// tables these should be scanned with the Iceberg API. +/// +/// For scanning these metadata tables this scanner calls into the JVM and creates an +/// 'IcebergMetadataScanner' object that does the scanning. Once the Iceberg scan is done, +/// the scan node starts fetching the rows one by one and materializes the Iceberg rows +/// into Impala rowbatches. +/// +/// The flow of scanning is: +/// 1. Backend: gets the FeIcebergTable object from the frontend +/// 2. Backend: creates an IcebergMetadataScanner object on the Java heap +/// 3. Backend: triggers a metadata table creation and scan on the Frontend +/// 4. Frontend: creates the metadata table and executes the scan +/// 5. Backend: calls GetNext that calls the IcebergMetadataScanner's GetNext +/// 6. Frontend: IcebergMetadataScanner's GetNext iterates over the result set and returns +/// a row in StructLike format +/// 7. Backend: converts and materializes the returned StructLike object into RowBatch +/// +/// Note: +/// This scan node should be executed on the coordinator, because it depends on the +/// frontend's table cache. +class IcebergMetadataScanPlanNode : public ScanPlanNode { + public: + Status CreateExecNode(RuntimeState* state, ExecNode** node) const override; + ~IcebergMetadataScanPlanNode(){} +}; + +class IcebergMetadataScanNode : public ScanNode { + public: + IcebergMetadataScanNode(ObjectPool* pool, const IcebergMetadataScanPlanNode& pnode, + const DescriptorTbl& descs); + + /// JNI setup. Creates global references for Java classes and find method ids. + /// Initializes static members, should be called once per process lifecycle. + static Status InitJNI() WARN_UNUSED_RESULT; + + /// Initializes counters, executes Iceberg table scan and initializes accessors. + Status Prepare(RuntimeState* state) override; + + /// Creates the Iceberg row reader. + Status Open(RuntimeState* state) override; + + /// Fills the next rowbatch with the results returned by the Iceberg scan. + Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + + /// Finalize and close this operator. + void Close(RuntimeState* state) override; + + private: + /// Global class references created with JniUtil. + inline static jclass impala_iceberg_metadata_scanner_cl_ = nullptr; + + /// Method references created with JniUtil. + inline static jmethodID iceberg_metadata_scanner_ctor_ = nullptr; + inline static jmethodID scan_metadata_table_ = nullptr; + inline static jmethodID get_accessor_ = nullptr; + inline static jmethodID get_next_ = nullptr; + + /// Iceberg metadata scanner Java object, it helps preparing the metadata table and + /// executes an Iceberg table scan. Allows the ScanNode to fetch the metadata from + /// the Java Heap. + jobject jmetadata_scanner_; + + /// Helper class to transform Iceberg rows to Impala tuples. + std::unique_ptr iceberg_row_reader_; + + /// Accessor map for the scan result, pairs the slot ids with the java Accessor + /// objects. + std::unordered_map jaccessors_; + + /// Tuple id resolved in Prepare() to set 'tuple_desc_'. + TupleId tuple_id_; + + /// Descriptor of tuples read from Iceberg metadata table. + const TupleDescriptor* tuple_desc_ = nullptr; + + /// Table and metadata table names. + const TTableName table_name_; + const string metadata_table_name_; + + /// Iceberg metadata scan specific counters. + RuntimeProfile::Counter* scan_prepare_timer_; + RuntimeProfile::Counter* iceberg_api_scan_timer_; + + /// Initializes the metadata table and executes an Iceberg scan through JNI. + Status ScanMetadataTable(); + + /// Gets the FeIceberg table from the Frontend. + Status GetCatalogTable(jobject* jtable); +}; + +} diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc new file mode 100644 index 000000000..b033d8bda --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/exec-node.inline.h" +#include "exec/iceberg-metadata/iceberg-row-reader.h" +#include "runtime/runtime-state.h" +#include "runtime/timestamp-value.inline.h" +#include "runtime/tuple-row.h" +#include "util/jni-util.h" + +namespace impala { + +IcebergRowReader::IcebergRowReader( + const TupleDescriptor* tuple_desc, const std::unordered_map& jaccessor) + : tuple_desc_(tuple_desc), + jaccessors_(jaccessor) {} + +Status IcebergRowReader::InitJNI() { + DCHECK(iceberg_accessor_cl_ == nullptr) << "InitJNI() already called!"; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + // Global class references: + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "org/apache/iceberg/Accessor", &iceberg_accessor_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/util/List", &list_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "org/apache/iceberg/types/Types$NestedField", &iceberg_nested_field_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/Boolean", &java_boolean_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/Integer", &java_int_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/Long", &java_long_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/CharSequence", &java_char_sequence_cl_)); + + // Method ids: + RETURN_IF_ERROR(JniUtil::GetMethodID(env, list_cl_, "get", + "(I)Ljava/lang/Object;", &list_get_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_accessor_cl_, "get", + "(Ljava/lang/Object;)Ljava/lang/Object;", &iceberg_accessor_get_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_boolean_cl_, "booleanValue", "()Z", + &boolean_value_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_int_cl_, "intValue", "()I", + &int_value_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_long_cl_, "longValue", "()J", + &long_value_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_char_sequence_cl_, "toString", + "()Ljava/lang/String;", &char_sequence_to_string_)); + return Status::OK(); +} + +Status IcebergRowReader::MaterializeRow(JNIEnv* env, + jobject struct_like_row, Tuple* tuple, MemPool* tuple_data_pool) { + DCHECK(env != nullptr); + DCHECK(struct_like_row != nullptr); + DCHECK(tuple != nullptr); + DCHECK(tuple_data_pool != nullptr); + for (SlotDescriptor* slot_desc: tuple_desc_->slots()) { + jobject accessed_value = env->CallObjectMethod(jaccessors_.at(slot_desc->col_pos()), + iceberg_accessor_get_, struct_like_row); + RETURN_ERROR_IF_EXC(env); + if (accessed_value == nullptr) { + tuple->SetNull(slot_desc->null_indicator_offset()); + continue; + } + void* slot = tuple->GetSlot(slot_desc->tuple_offset()); + switch (slot_desc->type().type) { + case TYPE_BOOLEAN: { // java.lang.Boolean + RETURN_IF_ERROR(WriteBooleanSlot(env, accessed_value, slot)); + break; + } case TYPE_INT: { // java.lang.Integer + RETURN_IF_ERROR(WriteIntSlot(env, accessed_value, slot)); + break; + } case TYPE_BIGINT: { // java.lang.Long + RETURN_IF_ERROR(WriteLongSlot(env, accessed_value, slot)); + break; + } case TYPE_TIMESTAMP: { // org.apache.iceberg.types.TimestampType + RETURN_IF_ERROR(WriteTimeStampSlot(env, accessed_value, slot)); + break; + } case TYPE_STRING: { // java.lang.String + RETURN_IF_ERROR(WriteStringSlot(env, accessed_value, slot, tuple_data_pool)); + break; + } + default: + // Skip the unsupported type and set it to NULL + tuple->SetNull(slot_desc->null_indicator_offset()); + VLOG(3) << "Skipping unsupported column type: " << slot_desc->type().type; + } + } + return Status::OK(); +} + +Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, jobject accessed_value, + void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_boolean_cl_) == JNI_TRUE); + jboolean result = env->CallBooleanMethod(accessed_value, boolean_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = (bool)(result == JNI_TRUE); + return Status::OK(); +} + +Status IcebergRowReader::WriteIntSlot(JNIEnv* env, jobject accessed_value, void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_int_cl_) == JNI_TRUE); + jint result = env->CallIntMethod(accessed_value, int_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = reinterpret_cast(result); + return Status::OK(); +} + +Status IcebergRowReader::WriteLongSlot(JNIEnv* env, jobject accessed_value, void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE); + jlong result = env->CallLongMethod(accessed_value, long_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = reinterpret_cast(result); + return Status::OK(); +} + +Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, jobject accessed_value, + void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE); + jlong result = env->CallLongMethod(accessed_value, long_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = TimestampValue::FromUnixTimeMicros(result, + UTCPTR); + return Status::OK(); +} + +Status IcebergRowReader::WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot, + MemPool* tuple_data_pool) { + DCHECK(env->IsInstanceOf(accessed_value, java_char_sequence_cl_) == JNI_TRUE); + jstring result = static_cast(env->CallObjectMethod(accessed_value, + char_sequence_to_string_)); + RETURN_ERROR_IF_EXC(env); + JniUtfCharGuard str_guard; + RETURN_IF_ERROR(JniUtfCharGuard::create(env, result, &str_guard)); + // Allocate memory and copy the string from the JVM to the RowBatch + int str_len = strlen(str_guard.get()); + char* buffer = reinterpret_cast(tuple_data_pool->TryAllocateUnaligned(str_len)); + if (UNLIKELY(buffer == nullptr)) { + string details = strings::Substitute("Failed to allocate $1 bytes for string.", + str_len); + return tuple_data_pool->mem_tracker()->MemLimitExceeded(nullptr, details, str_len); + } + memcpy(buffer, str_guard.get(), str_len); + reinterpret_cast(slot)->ptr = buffer; + reinterpret_cast(slot)->len = str_len; + return Status::OK(); +} + +} \ No newline at end of file diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.h b/be/src/exec/iceberg-metadata/iceberg-row-reader.h new file mode 100644 index 000000000..15c71a038 --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace impala { + +class MemPool; +class Status; +class SlotDescriptor; +class Tuple; +class TupleDescriptor; + +/// Row reader for Iceberg table scans, it translates a {StructLike} Java object to Impala +/// rows. It utilizes the provided {Accessor} objects to do this translation. +class IcebergRowReader { + public: + /// Initialize the tuple descriptor and accessors + IcebergRowReader(const TupleDescriptor* tuple_desc, + const std::unordered_map& jaccessors); + + /// JNI setup. Create global references for Java classes and find method ids. + /// Initializes static members, should be called once per process lifecycle. + static Status InitJNI(); + + /// Materlilize the StructLike Java objects into Impala rows. + Status MaterializeRow(JNIEnv* env, jobject struct_like_row, Tuple* tuple, + MemPool* tuple_data_pool); + + private: + /// Global class references created with JniUtil. + inline static jclass iceberg_accessor_cl_ = nullptr; + inline static jclass iceberg_nested_field_cl_ = nullptr; + inline static jclass list_cl_ = nullptr; + inline static jclass java_boolean_cl_ = nullptr; + inline static jclass java_int_cl_ = nullptr; + inline static jclass java_long_cl_ = nullptr; + inline static jclass java_char_sequence_cl_ = nullptr; + + /// Method references created with JniUtil. + inline static jmethodID iceberg_accessor_get_ = nullptr; + inline static jmethodID list_get_ = nullptr; + inline static jmethodID boolean_value_ = nullptr; + inline static jmethodID int_value_ = nullptr; + inline static jmethodID long_value_ = nullptr; + inline static jmethodID char_sequence_to_string_ = nullptr; + + /// TupleDescriptor received from the ScanNode. + const TupleDescriptor* tuple_desc_; + + /// Accessor map for the scan result, pairs the slot ids with the java Accessor + /// objects. + const std::unordered_map jaccessors_; + + /// Reads the value of a primitive from the StructLike, translates it to a matching + /// Impala type and writes it into the target tuple. The related Accessor objects are + /// stored in the jaccessors_ map and created during Prepare. + Status WriteBooleanSlot(JNIEnv* env, jobject accessed_value, void* slot); + Status WriteIntSlot(JNIEnv* env, jobject accessed_value, void* slot); + Status WriteLongSlot(JNIEnv* env, jobject accessed_value, void* slot); + /// Iceberg TimeStamp is parsed into TimestampValue. + Status WriteTimeStampSlot(JNIEnv* env, jobject accessed_value, void* slot); + /// To obtain a character sequence from JNI the JniUtfCharGuard class is used. Then the + /// data has to be copied to the tuple_data_pool, because the JVM releases the reference + /// and reclaims the memory area. + Status WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot, + MemPool* tuple_data_pool); +}; + +} diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index d5ebed49d..d42ec883b 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -65,7 +65,7 @@ static const string SCHEDULER_WARNING_KEY("Scheduler Warning"); static const vector SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE, - TPlanNodeType::KUDU_SCAN_NODE}; + TPlanNodeType::KUDU_SCAN_NODE, TPlanNodeType::ICEBERG_METADATA_SCAN_NODE}; // Consistent scheduling requires picking up to k distinct candidates out of n nodes. // Since each iteration can pick a node that it already picked (i.e. it is sampling with diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc index 4dcfd2506..65f830e88 100644 --- a/be/src/service/frontend.cc +++ b/be/src/service/frontend.cc @@ -113,6 +113,8 @@ Frontend::Frontend() { {"getFunctions", "([B)[B", &get_functions_id_}, {"getTableHistory", "([B)[B", &get_table_history_id_}, {"getCatalogObject", "([B)[B", &get_catalog_object_id_}, + {"getCatalogTable", "([B)Lorg/apache/impala/catalog/FeTable;", + &get_catalog_table_id_}, {"getRoles", "([B)[B", &show_roles_id_}, {"getPrincipalPrivileges", "([B)[B", &get_principal_privileges_id_}, {"execHiveServer2MetadataOp", "([B)[B", &exec_hs2_metadata_op_id_}, @@ -253,6 +255,10 @@ Status Frontend::GetCatalogObject(const TCatalogObject& req, return JniUtil::CallJniMethod(fe_, get_catalog_object_id_, req, resp); } +Status Frontend::GetCatalogTable(const TTableName& table_name, jobject* result) { + return JniUtil::CallJniMethod(fe_, get_catalog_table_id_, table_name, result); +} + Status Frontend::GetExecRequest( const TQueryCtx& query_ctx, TExecRequest* result) { return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result); diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h index 1df8dc44b..d450b0945 100644 --- a/be/src/service/frontend.h +++ b/be/src/service/frontend.h @@ -119,6 +119,10 @@ class Frontend { /// information on the error will be returned. Status GetCatalogObject(const TCatalogObject& request, TCatalogObject* response); + /// Gets the Java object of a Catalog table object. It can be used to call Java methods + /// of the Catalog Table object. + Status GetCatalogTable(const TTableName& table_name, jobject *result); + /// Call FE to get the roles. Status ShowRoles(const TShowRolesParams& params, TShowRolesResult* result); @@ -246,6 +250,7 @@ class Frontend { jmethodID get_functions_id_; // JniFrontend.getFunctions jmethodID get_table_history_id_; // JniFrontend.getTableHistory jmethodID get_catalog_object_id_; // JniFrontend.getCatalogObject + jmethodID get_catalog_table_id_; // JniFrontend.getCatalogTable jmethodID show_roles_id_; // JniFrontend.getRoles jmethodID get_principal_privileges_id_; // JniFrontend.getPrincipalPrivileges jmethodID exec_hs2_metadata_op_id_; // JniFrontend.execHiveServer2MetadataOp diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc index 0390714de..ed9159ba2 100644 --- a/be/src/service/impalad-main.cc +++ b/be/src/service/impalad-main.cc @@ -29,6 +29,8 @@ #include "common/status.h" #include "exec/hbase/hbase-table-scanner.h" #include "exec/hbase/hbase-table-writer.h" +#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h" +#include "exec/iceberg-metadata/iceberg-row-reader.h" #include "exprs/hive-udf-call.h" #include "exprs/timezone_db.h" #include "gen-cpp/ImpalaService.h" @@ -64,6 +66,8 @@ int ImpaladMain(int argc, char** argv) { ABORT_IF_ERROR(HBaseTableScanner::Init()); ABORT_IF_ERROR(HBaseTable::InitJNI()); ABORT_IF_ERROR(HBaseTableWriter::InitJNI()); + ABORT_IF_ERROR(IcebergMetadataScanNode::InitJNI()); + ABORT_IF_ERROR(IcebergRowReader::InitJNI()); ABORT_IF_ERROR(HiveUdfCall::InitEnv()); ABORT_IF_ERROR(JniCatalogCacheUpdateIterator::InitJNI()); InitFeSupport(); diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc index 5a00e43c8..5b7dc19bd 100644 --- a/be/src/util/jni-util.cc +++ b/be/src/util/jni-util.cc @@ -112,6 +112,20 @@ bool JniUtil::MethodExists(JNIEnv* env, jclass class_ref, const char* method_str return true; } +Status JniUtil::GetMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref) { + *method_ref = env->GetMethodID(class_ref, method_str, method_signature); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + +Status JniUtil::GetStaticMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref) { + *method_ref = env->GetStaticMethodID(class_ref, method_str, method_signature); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) { *class_ref = NULL; jclass local_cl = env->FindClass(class_str); diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index f56ce404b..5005315bb 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -226,6 +226,8 @@ class JniCall { Status ObjectToResult(jobject obj, std::string* result) WARN_UNUSED_RESULT; + Status ObjectToResult(jobject obj, jobject* result) WARN_UNUSED_RESULT; + const jmethodID method_; JNIEnv* const env_; JniLocalFrame frame_; @@ -281,6 +283,16 @@ class JniUtil { static bool MethodExists(JNIEnv* env, jclass class_ref, const char* method_str, const char* method_signature); + /// Wrapper method around JNI's 'GetMethodID'. Returns the method reference for the + /// requested method. + static Status GetMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref); + + /// Wrapper method around JNI's 'GetStaticMethodID'. Returns the method reference for + /// the requested method. + static Status GetStaticMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref); + /// Returns a global JNI reference to the class specified by class_str into class_ref. /// The returned reference must eventually be freed by calling FreeGlobalRef() (or have /// the lifetime of the impalad process). @@ -377,6 +389,10 @@ class JniUtil { static Status CallJniMethod(const jobject& obj, const jmethodID& method, R* response) WARN_UNUSED_RESULT; + template + static Status CallJniMethod(const jobject& obj, const jmethodID& method, + const T& arg, jobject* response) WARN_UNUSED_RESULT; + private: // Slow-path for GetJNIEnv, used on the first call by any thread. static JNIEnv* GetJNIEnvSlowPath(); @@ -442,6 +458,12 @@ inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method return JniCall::instance_method(obj, method).Call(response); } +template +inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method, + const T& arg, jobject* response) { + return JniCall::instance_method(obj, method).with_thrift_arg(arg).Call(response); +} + inline JniCall::JniCall(jmethodID method) : method_(method), env_(JniUtil::GetJNIEnv()) { @@ -506,6 +528,14 @@ inline Status JniCall::ObjectToResult(jobject obj, std::string* result) { return Status::OK(); } +inline Status JniCall::ObjectToResult(jobject obj, jobject* result) { + DCHECK(obj) << "Call returned unexpected null Thrift object"; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, obj, result)); + return Status::OK(); +} + } #endif diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 120f6c66c..9914c66b5 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -52,6 +52,7 @@ enum TPlanNodeType { CARDINALITY_CHECK_NODE = 16 MULTI_AGGREGATION_NODE = 17 ICEBERG_DELETE_NODE = 18 + ICEBERG_METADATA_SCAN_NODE = 19 } // phases of an execution node @@ -679,6 +680,15 @@ struct TCardinalityCheckNode { 1: required string display_statement } +struct TIcebergMetadataScanNode { + // Tuple ID of the Tuple this scanner should scan. + 1: required Types.TTupleId tuple_id + // Name of the FeIcebergTable that will be used for the metadata table scan. + 2: required CatalogObjects.TTableName table_name + // The metadata table name specifies which metadata table has to be scanned. + 3: required string metadata_table_name; +} + // See PipelineMembership in the frontend for details. struct TPipelineMembership { 1: required Types.TPlanNodeId pipe_id @@ -705,38 +715,39 @@ struct TPlanNode { // node is codegen'd if the backend supports it. 8: required bool disable_codegen - 27: required list pipelines + 9: required list pipelines // one field per PlanNode subclass - 9: optional THdfsScanNode hdfs_scan_node - 10: optional THBaseScanNode hbase_scan_node - 11: optional TKuduScanNode kudu_scan_node - 12: optional TDataSourceScanNode data_source_node - 13: optional TJoinNode join_node + 10: optional THdfsScanNode hdfs_scan_node + 11: optional THBaseScanNode hbase_scan_node + 12: optional TKuduScanNode kudu_scan_node + 13: optional TDataSourceScanNode data_source_node + 14: optional TJoinNode join_node 15: optional TAggregationNode agg_node 16: optional TSortNode sort_node 17: optional TUnionNode union_node 18: optional TExchangeNode exchange_node 19: optional TAnalyticNode analytic_node 20: optional TUnnestNode unnest_node + 21: optional TIcebergMetadataScanNode iceberg_scan_metadata_node // Label that should be used to print this node to the user. - 21: optional string label + 22: optional string label // Additional details that should be printed to the user. This is node specific // e.g. table name, join strategy, etc. - 22: optional string label_detail + 23: optional string label_detail // Estimated execution stats generated by the planner. - 23: optional ExecStats.TExecStats estimated_stats + 24: optional ExecStats.TExecStats estimated_stats // Runtime filters assigned to this plan node - 24: optional list runtime_filters + 25: optional list runtime_filters // Resource profile for this plan node. - 25: required ResourceProfile.TBackendResourceProfile resource_profile + 26: required ResourceProfile.TBackendResourceProfile resource_profile - 26: optional TCardinalityCheckNode cardinality_check_node + 27: optional TCardinalityCheckNode cardinality_check_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 224be7859..230cf351a 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -3618,7 +3618,7 @@ public class Analyzer { // The catalog table (the base of the virtual table) has been loaded and cached // under the name of the virtual table. FeTable catalogTable = getStmtTableCache().tables.get(virtualTableName); - if (catalogTable instanceof IcebergMetadataTable) return; + if (catalogTable instanceof IcebergMetadataTable || catalogTable == null) return; IcebergMetadataTable virtualTable = new IcebergMetadataTable(catalogTable, tblRefPath.get(2)); getStmtTableCache().tables.put(catalogTableName, catalogTable); diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java b/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java index 78823d002..294b7c56b 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java @@ -29,6 +29,8 @@ import com.google.common.base.Preconditions; */ public class IcebergMetadataTableRef extends TableRef { + private String metadataTableName_; + public IcebergMetadataTableRef(TableRef tableRef, Path resolvedPath) { super(tableRef); Preconditions.checkState(resolvedPath.isResolved()); @@ -37,20 +39,28 @@ public class IcebergMetadataTableRef extends TableRef { resolvedPath_ = resolvedPath; IcebergMetadataTable iceMTbl = (IcebergMetadataTable)resolvedPath.getRootTable(); FeIcebergTable iceTbl = iceMTbl.getBaseTable(); + metadataTableName_ = iceMTbl.getMetadataTableName(); if (hasExplicitAlias()) return; aliases_ = new String[] { iceTbl.getTableName().toString().toLowerCase(), iceTbl.getName().toLowerCase()}; } + public String getMetadataTableName() { + return metadataTableName_; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException { if (isAnalyzed_) return; IcebergMetadataTable rootTable = (IcebergMetadataTable)resolvedPath_.getRootTable(); FeTable iceRootTable = rootTable.getBaseTable(); analyzer.registerAuthAndAuditEvent(iceRootTable, priv_, requireGrantOption_); + analyzeTimeTravel(analyzer); desc_ = analyzer.registerTableRef(this); isAnalyzed_ = true; + analyzeHints(analyzer); + analyzeJoin(analyzer); } } diff --git a/fe/src/main/java/org/apache/impala/analysis/Path.java b/fe/src/main/java/org/apache/impala/analysis/Path.java index 2f37489e4..f26c63ab0 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Path.java +++ b/fe/src/main/java/org/apache/impala/analysis/Path.java @@ -29,6 +29,7 @@ import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.VirtualColumn; +import org.apache.impala.catalog.VirtualTable; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.thrift.TVirtualColumnType; import org.apache.impala.util.AcidUtils; @@ -447,6 +448,9 @@ public class Path { } else { result.add(rootTable_.getDb().getName()); result.add(rootTable_.getName()); + if (rootTable_ instanceof VirtualTable) { + result.add(((IcebergMetadataTable)rootTable_).getMetadataTableName()); + } } result.addAll(rawPath_); return result; diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java index b7d0dcbe0..a92b07471 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java @@ -27,7 +27,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.impala.analysis.TableName; import org.apache.impala.catalog.Column; -import org.apache.impala.catalog.FeCatalogUtils; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.VirtualTable; @@ -45,7 +44,11 @@ import com.google.common.base.Preconditions; * table object based on the Iceberg API. */ public class IcebergMetadataTable extends VirtualTable { + + // The Iceberg table that is the base of the metadata table. private FeIcebergTable baseTable_; + + // Name of the metadata table. private String metadataTableName_; public IcebergMetadataTable(FeTable baseTable, String metadataTableTypeStr) @@ -53,7 +56,7 @@ public class IcebergMetadataTable extends VirtualTable { super(null, baseTable.getDb(), baseTable.getName(), baseTable.getOwnerUser()); Preconditions.checkArgument(baseTable instanceof FeIcebergTable); baseTable_ = (FeIcebergTable) baseTable; - metadataTableName_ = metadataTableTypeStr; + metadataTableName_ = metadataTableTypeStr.toUpperCase(); MetadataTableType type = MetadataTableType.from(metadataTableTypeStr.toUpperCase()); Preconditions.checkNotNull(type); Table metadataTable = MetadataTableUtils.createMetadataTableInstance( @@ -79,6 +82,10 @@ public class IcebergMetadataTable extends VirtualTable { return super.getFullName() + "." + metadataTableName_; } + public String getMetadataTableName() { + return metadataTableName_; + } + @Override public TableName getTableName() { return new TableName(db_.getName(), name_, metadataTableName_); @@ -105,7 +112,6 @@ public class IcebergMetadataTable extends VirtualTable { public TTableDescriptor toThriftDescriptor(int tableId, Set referencedPartitions) { TTableDescriptor desc = baseTable_.toThriftDescriptor(tableId, referencedPartitions); - desc.setColumnDescriptors(FeCatalogUtils.getTColumnDescriptors(this)); return desc; } diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 23601e098..2fa2b4445 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -116,8 +116,13 @@ public class DistributedPlanner { PlanFragment result = null; if (root instanceof ScanNode) { - result = createScanFragment(root); - fragments.add(result); + if (root instanceof IcebergMetadataScanNode) { + result = createIcebergMetadataScanFragment(root); + fragments.add(result); + } else { + result = createScanFragment(root); + fragments.add(result); + } } else if (root instanceof HashJoinNode) { Preconditions.checkState(childFragments.size() == 2); result = createHashJoinFragment((HashJoinNode) root, @@ -152,9 +157,6 @@ public class DistributedPlanner { ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED); } else if (root instanceof CardinalityCheckNode) { result = createCardinalityCheckNodeFragment((CardinalityCheckNode) root, childFragments); - } else if (root instanceof IcebergMetadataScanNode) { - result = createIcebergMetadataScanFragment(root); - fragments.add(result); } else if (root instanceof IcebergDeleteNode) { Preconditions.checkState(childFragments.size() == 2); result = createIcebergDeleteFragment((IcebergDeleteNode) root, @@ -406,7 +408,8 @@ public class DistributedPlanner { } /** - * Create an Iceberg Metadata scan fragment. + * Create an Iceberg Metadata scan fragment. This fragment is UNPARTITIONED, so the + * scheduler can schedule it as coordinator only fragment. */ private PlanFragment createIcebergMetadataScanFragment(PlanNode node) { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java index 4a776786e..181cce8d3 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java @@ -17,28 +17,45 @@ package org.apache.impala.planner; +import java.util.List; + import org.apache.impala.analysis.Analyzer; -import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.TableRef; +import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.ImpalaException; import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TIcebergMetadataScanNode; import org.apache.impala.thrift.TPlanNode; +import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TScanRangeSpec; +import org.apache.impala.thrift.TTableName; import com.google.common.base.Preconditions; public class IcebergMetadataScanNode extends ScanNode { - protected IcebergMetadataScanNode(PlanNodeId id, TupleDescriptor desc) { - super(id, desc, "SCAN ICEBERG METADATA"); + // Metadata table name, it is stored here so it can be passed to the backend. + protected final String metadataTableName_; + + protected IcebergMetadataScanNode(PlanNodeId id, List conjuncts, + TableRef tblRef) { + super(id, tblRef.getDesc(), "SCAN ICEBERG METADATA"); + conjuncts_ = conjuncts; + metadataTableName_ = ((IcebergMetadataTable)tblRef.getTable()).getMetadataTableName(); } @Override public void init(Analyzer analyzer) throws ImpalaException { - super.init(analyzer); scanRangeSpecs_ = new TScanRangeSpec(); + assignConjuncts(analyzer); + conjuncts_ = orderConjunctsByCost(conjuncts_); + analyzer.materializeSlots(conjuncts_); computeMemLayout(analyzer); computeStats(analyzer); + numInstances_ = 1; + numNodes_ = 1; } @Override @@ -62,7 +79,12 @@ public class IcebergMetadataScanNode extends ScanNode { @Override protected void toThrift(TPlanNode msg) { - // Implement for fragment execution + msg.iceberg_scan_metadata_node = new TIcebergMetadataScanNode(); + msg.node_type = TPlanNodeType.ICEBERG_METADATA_SCAN_NODE; + msg.iceberg_scan_metadata_node.table_name = + new TTableName(desc_.getTableName().getDb(), desc_.getTableName().getTbl()); + msg.iceberg_scan_metadata_node.metadata_table_name = metadataTableName_; + msg.iceberg_scan_metadata_node.tuple_id = desc_.getId().asInt(); } @Override diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index 0d4d37ca2..4b26a5132 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -81,6 +81,7 @@ import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.VirtualColumn; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; @@ -141,7 +142,8 @@ public class IcebergScanPlanner { public IcebergScanPlanner(Analyzer analyzer, PlannerContext ctx, TableRef iceTblRef, List conjuncts, MultiAggregateInfo aggInfo) throws ImpalaException { - Preconditions.checkState(iceTblRef.getTable() instanceof FeIcebergTable); + Preconditions.checkState(iceTblRef.getTable() instanceof FeIcebergTable || + iceTblRef.getTable() instanceof IcebergMetadataTable); analyzer_ = analyzer; ctx_ = ctx; tblRef_ = iceTblRef; diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 9437fddf6..03d573002 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -71,6 +71,7 @@ import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.TableLoadingException; +import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; @@ -1872,7 +1873,6 @@ public class SingleNodePlanner { Expr.removeDuplicates(conjuncts); } - // TODO(todd) introduce FE interfaces for DataSourceTable, HBaseTable, KuduTable FeTable table = tblRef.getTable(); if (table instanceof FeFsTable) { if (table instanceof FeIcebergTable) { @@ -1897,16 +1897,18 @@ public class SingleNodePlanner { aggInfo, tblRef); scanNode.init(analyzer); return scanNode; + } else if (table instanceof IcebergMetadataTable) { + return createIcebergMetadataScanNode(tblRef, conjuncts, analyzer); } else { throw new NotImplementedException( "Planning not implemented for table class: " + table.getClass()); } } - private PlanNode createIcebergMetadataScanNode(TableRef tblRef, Analyzer analyzer) - throws ImpalaException { + private PlanNode createIcebergMetadataScanNode(TableRef tblRef, List conjuncts, + Analyzer analyzer) throws ImpalaException { IcebergMetadataScanNode icebergMetadataScanNode = - new IcebergMetadataScanNode(ctx_.getNextNodeId(), tblRef.getDesc()); + new IcebergMetadataScanNode(ctx_.getNextNodeId(), conjuncts, tblRef); icebergMetadataScanNode.init(analyzer); return icebergMetadataScanNode; } @@ -2224,7 +2226,7 @@ public class SingleNodePlanner { result = new SingularRowSrcNode(ctx_.getNextNodeId(), ctx_.getSubplan()); result.init(analyzer); } else if (tblRef instanceof IcebergMetadataTableRef) { - result = createIcebergMetadataScanNode(tblRef, analyzer); + result = createScanNode(tblRef, aggInfo, analyzer); } else { throw new NotImplementedException( "Planning not implemented for table ref class: " + tblRef.getClass()); diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index da34d41ce..6bb6b7015 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2490,14 +2490,6 @@ public class Frontend { return result; } - // Blocking query execution for queries that contain IcebergMetadataTables - for (FeTable table : stmtTableCache.tables.values()) { - if (table instanceof IcebergMetadataTable) { - throw new NotImplementedException(String.format("'%s' refers to a metadata " - + "table which is currently not supported.", table.getFullName())); - } - } - result.setQuery_exec_request(queryExecRequest); if (analysisResult.isQueryStmt()) { result.stmt_type = TStmtType.QUERY; diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index b672cb6a8..763256a0c 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -38,8 +38,10 @@ import org.apache.impala.authentication.saml.WrappedWebContext; import org.apache.impala.authorization.AuthorizationFactory; import org.apache.impala.authorization.ImpalaInternalAdminUser; import org.apache.impala.authorization.User; +import org.apache.impala.catalog.DatabaseNotFoundException; import org.apache.impala.catalog.FeDataSource; import org.apache.impala.catalog.FeDb; +import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; @@ -53,6 +55,7 @@ import org.apache.impala.service.Frontend.PlanCtx; import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.thrift.TBuildTestDescriptorTableParams; import org.apache.impala.thrift.TCatalogObject; +import org.apache.impala.thrift.TColumnValue; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TDescribeDbParams; import org.apache.impala.thrift.TDescribeOutputStyle; @@ -112,6 +115,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.IllegalArgumentException; +import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.List; @@ -614,6 +618,13 @@ public class JniFrontend { frontend_.waitForCatalog(); } + FeTable getCatalogTable(byte[] tableNameParam) throws ImpalaException { + Preconditions.checkNotNull(frontend_); + TTableName tableName = new TTableName(); + JniUtil.deserializeThrift(protocolFactory_, tableName, tableNameParam); + return frontend_.getCatalog().getTable(tableName.db_name, tableName.table_name); + } + // Caching this saves ~50ms per call to getHadoopConfigAsHtml private static final Configuration CONF = new Configuration(); private static final Groups GROUPS = Groups.getUserToGroupsMappingService(CONF); diff --git a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java new file mode 100644 index 000000000..a2f7c5b23 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.util; + +import com.google.common.base.Preconditions; + +import org.apache.iceberg.Accessor; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.StructLike; +import org.apache.impala.catalog.FeIcebergTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterator; + +/** + * The metadata table scanner class to scan Iceberg metadata tables through the Iceberg + * API. This object is instantiated and governed by {IcebergMetadataScanNode} at the + * backend during scanning. + * + * Iceberg generally throws RuntimeExceptions, these have to be taken care of by the + * caller of {@code IcebergMetadataScanner}. + */ +public class IcebergMetadataScanner { + // FeTable object is extracted by the backend and passed when this object is created + private FeIcebergTable iceTbl_ = null; + + // Metadata table + private Table metadataTable_ = null; + + // Name of the metadata table + private String metadataTableName_; + + // Persist the file scan task iterator so we can continue after a RowBatch is full + private CloseableIterator fileScanTaskIterator_; + + // Persist the data rows iterator, so we can continue after a batch is filled + private CloseableIterator dataRowsIterator_; + + public IcebergMetadataScanner(FeIcebergTable iceTbl, String metadataTableName) { + Preconditions.checkNotNull(iceTbl); + this.iceTbl_ = (FeIcebergTable) iceTbl; + this.metadataTableName_ = metadataTableName; + } + + /** + * Iterates over the {{fileScanTaskIterator_}} to find a {FileScanTask} that has rows. + */ + public boolean FindFileScanTaskWithRows() { + while (fileScanTaskIterator_.hasNext()) { + DataTask dataTask = (DataTask)fileScanTaskIterator_.next(); + dataRowsIterator_ = dataTask.rows().iterator(); + if (dataRowsIterator_.hasNext()) return true; + } + return false; + } + + /** + * Creates the Metadata{Table} which is a predifined Iceberg {Table} object. This method + * also starts an Iceberg {TableScan} to scan the {Table}. After the scan is ready it + * initializes the iterators, so the {GetNext} call can start fetching the rows through + * the Iceberg Api. + */ + public void ScanMetadataTable() { + // Create and scan the metadata table + metadataTable_ = MetadataTableUtils.createMetadataTableInstance( + iceTbl_.getIcebergApiTable(), MetadataTableType.valueOf(metadataTableName_)); + TableScan scan = metadataTable_.newScan(); + // Init the FileScanTask iterator and DataRowsIterator + fileScanTaskIterator_ = scan.planFiles().iterator(); + FindFileScanTaskWithRows(); + } + + /** + * Returns the field {Accessor} for the specified column position. This {Accessor} is + * used to access a field in the {StructLike} object. + */ + public Accessor GetAccessor(int slotColPos) { + int fieldId = metadataTable_.schema().columns().get(slotColPos).fieldId(); + return metadataTable_.schema().accessorForField(fieldId); + } + + /** + * Returns the next available row of the scan result. The row is a {StructLike} object + * and its fields can be accessed with {Accessor}s. + */ + public StructLike GetNext() { + // Return the next row in the DataRows iterator + if (dataRowsIterator_.hasNext()) { + return dataRowsIterator_.next(); + } + // Otherwise this DataTask is empty, find a FileScanTask that has a non-empty DataTask + if(FindFileScanTaskWithRows()) { + return dataRowsIterator_.next(); + } + return null; + } +} diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index 387618008..a15e5bd28 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -3758,6 +3758,23 @@ INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(3, 'parquet', 2.5, fa ---- DATASET functional ---- BASE_TABLE_NAME +iceberg_query_metadata +---- CREATE +CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + i int +) +STORED BY ICEBERG +LOCATION '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata' +TBLPROPERTIES('format-version'='2'); +---- DEPENDENT_LOAD +INSERT INTO {db_name}{db_suffix}.{table_name} VALUES (1); +INSERT INTO {db_name}{db_suffix}.{table_name} VALUES (2); +INSERT INTO {db_name}{db_suffix}.{table_name} VALUES (3); +DELETE FROM {db_name}{db_suffix}.{table_name} WHERE i = 2; +==== +---- DATASET +functional +---- BASE_TABLE_NAME iceberg_lineitem_multiblock ---- CREATE CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv index 2418404fc..972715384 100644 --- a/testdata/datasets/functional/schema_constraints.csv +++ b/testdata/datasets/functional/schema_constraints.csv @@ -96,6 +96,7 @@ table_name:iceberg_v2_partitioned_position_deletes_orc, constraint:restrict_to, table_name:iceberg_multiple_storage_locations, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_avro_format, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_mixed_file_format, constraint:restrict_to, table_format:parquet/none/none +table_name:iceberg_test_metadata, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_lineitem_sixblocks, constraint:restrict_to, table_format:parquet/none/none diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test index f976cb75e..a605c94b8 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test @@ -1,102 +1,116 @@ -explain SELECT * FROM functional_parquet.iceberg_alltypes_part_orc.history +SELECT * FROM functional_parquet.iceberg_alltypes_part_orc.history ---- PLAN PLAN-ROOT SINK | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY] row-size=33B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -01:EXCHANGE [UNPARTITIONED] -| -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY] row-size=33B cardinality=unavailable ==== -explain select * +select * from functional_parquet.iceberg_alltypes_part_orc.history q join functional_parquet.iceberg_alltypes_part_orc.history z on z.snapshot_id = q.snapshot_id ---- PLAN PLAN-ROOT SINK | -02:NESTED LOOP JOIN [CROSS JOIN] +02:HASH JOIN [INNER JOIN] +| hash predicates: q.snapshot_id = z.snapshot_id | row-size=66B cardinality=unavailable | -|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] | row-size=33B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=33B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -04:EXCHANGE [UNPARTITIONED] -| -02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: q.snapshot_id = z.snapshot_id | row-size=66B cardinality=unavailable | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [UNPARTITIONED] | | -| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] | row-size=33B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=33B cardinality=unavailable ==== -explain select q.snapshot_id, z.made_current_at as test1, z.made_current_at as test2 +select q.snapshot_id, z.made_current_at as test1, z.made_current_at as test2 from functional_parquet.iceberg_alltypes_part_orc.history q join functional_parquet.iceberg_alltypes_part_orc.history z on z.snapshot_id = q.snapshot_id ---- PLAN PLAN-ROOT SINK | -02:NESTED LOOP JOIN [CROSS JOIN] -| row-size=24B cardinality=unavailable +02:HASH JOIN [INNER JOIN] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable | -|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] -| row-size=16B cardinality=unavailable +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=8B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -04:EXCHANGE [UNPARTITIONED] +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable | -02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] -| row-size=24B cardinality=unavailable -| -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [UNPARTITIONED] | | -| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] -| row-size=16B cardinality=unavailable +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=8B cardinality=unavailable ==== -explain select * from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries +select q.snapshot_id, z.made_current_at as test1, z.made_current_at as test2 +from functional_parquet.iceberg_alltypes_part_orc.history q + join /* +SHUFFLE */ functional_parquet.iceberg_alltypes_part_orc.history z + on z.snapshot_id = q.snapshot_id ---- PLAN PLAN-ROOT SINK | -01:SUBPLAN -| row-size=98B cardinality=unavailable +02:HASH JOIN [INNER JOIN] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable | -|--04:NESTED LOOP JOIN [CROSS JOIN] -| | row-size=98B cardinality=10 -| | -| |--02:SINGULAR ROW SRC -| | row-size=72B cardinality=1 -| | -| 03:UNNEST [a.partition_summaries] -| row-size=0B cardinality=10 +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.manifests a] - row-size=72B cardinality=unavailable +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] + row-size=8B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 05:EXCHANGE [UNPARTITIONED] | +02:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable +| +|--04:EXCHANGE [HASH(z.snapshot_id)] +| | +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable +| +03:EXCHANGE [HASH(q.snapshot_id)] +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] + row-size=8B cardinality=unavailable +==== +select * from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries +---- PLAN +PLAN-ROOT SINK +| 01:SUBPLAN | row-size=98B cardinality=unavailable | @@ -109,6 +123,23 @@ PLAN-ROOT SINK | 03:UNNEST [a.partition_summaries] | row-size=0B cardinality=10 | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.manifests a] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a] + row-size=72B cardinality=unavailable +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:SUBPLAN +| row-size=98B cardinality=unavailable +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | row-size=98B cardinality=10 +| | +| |--02:SINGULAR ROW SRC +| | row-size=72B cardinality=1 +| | +| 03:UNNEST [a.partition_summaries] +| row-size=0B cardinality=10 +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a] row-size=72B cardinality=unavailable ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test index ee1059e41..5a9928451 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test @@ -1,132 +1,456 @@ +# The test table for these tests are created during dataload by Impala. An existing table +# could not have been rewritten manually, because avrotools removes additional schemata +# from the manifests files that Iceberg adds. Therefore, the query results are checked +# with regexp. +#### +# Test 0 : Query all the metadata tables once +#### ==== ---- QUERY -# List of all metadata tables in current version -select * from functional_parquet.iceberg_alltypes_part_orc.entries ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.entries' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.entries; +---- RESULTS +# Example: +# 1,8283026816932323050,3,3 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 ---- TYPES -STRING +INT,BIGINT,BIGINT,BIGINT ==== ---- QUERY -# 'Files' is a keyword and need to be escaped -select * from functional_parquet.iceberg_alltypes_part_orc.`files` ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.`files`; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/754b1471ee8d8aa2-4f2f33ef00000000_134436143_data.0.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.data_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.data_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.data_files; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/944a2355e618932f-18f086b600000000_1283312202_data.0.parq','PARQUET',0,1,351,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.delete_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.delete_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.delete_files; +---- RESULTS +# Example: +# 1,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/delete-1f43b217940cc094-fedf515600000000_248998721_data.0.parq','PARQUET',0,1,1489,'NULL',NULL +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.history ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.523000000,9046920472784493998,8491702501245661704,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true ---- TYPES -STRING +TIMESTAMP,BIGINT,BIGINT,BOOLEAN ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.snapshots ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.snapshots' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.metadata_log_entries; +---- RESULTS +# Example: +# 2023-08-16 12:18:11.061000000,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/00000-0ae98ebd-b200-4381-9d97-1f93954423a9.metadata.json',NULL,NULL,NULL +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',NULL,NULL,NULL +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,1 +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,2 +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,3 +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,4 ---- TYPES -STRING +TIMESTAMP,STRING,BIGINT,INT,BIGINT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.manifests ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.manifests' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.snapshots; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.322000000,8491702501245661704,NULL,'append','hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/snap-8491702501245661704-1-88a39285-529f-41a4-bd69-6d2560fac64e.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,'append','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'append','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'append','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'overwrite','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' ---- TYPES -STRING +TIMESTAMP,BIGINT,BIGINT,STRING,STRING ==== ---- QUERY -# 'Partitions' is a keyword and need to be escaped -select * from functional_parquet.iceberg_alltypes_part_orc.`partitions` ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.partitions' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.refs; +---- RESULTS +row_regex:'main','BRANCH',[1-9]\d*|0,NULL,NULL,NULL ---- TYPES -STRING +STRING,STRING,BIGINT,BIGINT,INT,BIGINT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_data_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_data_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.manifests; +---- RESULTS +# Example: +# row_regex:0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/38e5a1bd-5b7f-4eae-9362-16a2de3c575d-m0.avro',6631,0,8283026816932323050,1,0,0,0,0,0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,0,0,0,1,0,0 ---- TYPES -STRING +INT,STRING,BIGINT,INT,BIGINT,INT,INT,INT,INT,INT,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.`partitions`; +---- RESULTS +3,3,1,1,0,0 ---- TYPES -STRING +BIGINT,INT,BIGINT,INT,BIGINT,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_manifests ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_manifests' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_data_files; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/944a2355e618932f-18f086b600000000_1283312202_data.0.parq','PARQUET',0,1,351,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_entries ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_entries' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_delete_files; +---- RESULTS +# Example: +# 1,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/delete-1f43b217940cc094-fedf515600000000_248998721_data.0.parq','PARQUET',0,1,1489,'NULL',NULL +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -# Select list with column name -select snapshot_id from functional_parquet.iceberg_alltypes_part_orc.history ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_files; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/3d481ed88b2941f0-ea33816200000000_1109948289_data.0.parq','PARQUET',0,1,351,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -# Joining tables -select * -from functional_parquet.iceberg_alltypes_part_orc.history q - join functional_parquet.iceberg_alltypes_part_orc.snapshots z - on z.snapshot_id = q.snapshot_id ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_manifests; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/38e5a1bd-5b7f-4eae-9362-16a2de3c575d-m0.avro',6631,0,8283026816932323050,1,0,0,0,0,0,7858675898458780516 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,0,0,0,1,0,0,[1-9]\d*|0 ---- TYPES -STRING +INT,STRING,BIGINT,INT,BIGINT,INT,INT,INT,INT,INT,INT,BIGINT ==== ---- QUERY -# Inline query -select x.snapshot_id -from (select * from functional_parquet.iceberg_alltypes_part_orc.history) x; ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_entries; +---- RESULTS +# Example: +# 1,7858675898458780516,4,4 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 ---- TYPES -STRING +INT,BIGINT,BIGINT,BIGINT + +#### +# Test 1 : Test select list +#### ==== ---- QUERY -# Complext type -select *, a.partition_summaries.pos from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.manifests' refers to a metadata table which is currently not supported. +select snapshot_id from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +# Example: +# 7858675898458780516 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 ---- TYPES -STRING +BIGINT ==== ---- QUERY -# Using complex type 'map' column without a join -select summary from functional_parquet.iceberg_alltypes_part_orc.snapshots; ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.snapshots' refers to a metadata table which is currently not supported. +select snapshot_id, * from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +# Example: +# 7858675898458780516,2023-08-16 12:18:18.584000000,7858675898458780516,8283026816932323050,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true ---- TYPES -STRING +BIGINT,TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +select count(*) from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +4 +---- TYPES +BIGINT +==== +---- QUERY +select record_count + file_count from functional_parquet.iceberg_query_metadata.`partitions`; +---- RESULTS +6 +---- TYPES +BIGINT + +#### +# Test 2 : Test filtering +#### +==== +---- QUERY +# Test BIGINT +select * from functional_parquet.iceberg_query_metadata.history +where snapshot_id = $OVERWRITE_SNAPSHOT_ID; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.523000000,9046920472784493998,8491702501245661704,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,$OVERWRITE_SNAPSHOT_ID,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +# Test BOOLEAN +select * from functional_parquet.iceberg_query_metadata.history +where is_current_ancestor = true; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.523000000,9046920472784493998,8491702501245661704,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +# Test STRING +select * from functional_parquet.iceberg_query_metadata.snapshots +where operation = 'overwrite'; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.322000000,8491702501245661704,NULL,'append','hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/snap-8491702501245661704-1-88a39285-529f-41a4-bd69-6d2560fac64e.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'overwrite','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +---- TYPES +TIMESTAMP,BIGINT,BIGINT,STRING,STRING +==== +---- QUERY +# Test TIMESTAMP +select * from functional_parquet.iceberg_query_metadata.history +where made_current_at = cast("$OVERWRITE_SNAPSHOT_TS" as timestamp); +---- RESULTS +row_regex:$OVERWRITE_SNAPSHOT_TS,$OVERWRITE_SNAPSHOT_ID,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +# Test conjunct slot materialization +select snapshot_id from functional_parquet.iceberg_query_metadata.snapshots +where operation = 'overwrite'; +---- RESULTS +$OVERWRITE_SNAPSHOT_ID +---- TYPES +BIGINT +==== +---- QUERY +# Test an expression rewrite: OR -> IN () +select * from functional_parquet.iceberg_query_metadata.history +where snapshot_id = $OVERWRITE_SNAPSHOT_ID or snapshot_id = 1; +---- RESULTS +row_regex:$OVERWRITE_SNAPSHOT_TS,$OVERWRITE_SNAPSHOT_ID,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN + +#### +# Test 2 : Test joins +#### +==== +---- QUERY +select a.snapshot_id, b.snapshot_id from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.history b on a.snapshot_id = b.snapshot_id; +---- RESULTS +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +---- TYPES +BIGINT,BIGINT +==== +---- QUERY +select a.snapshot_id, b.parent_id from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.history b on a.snapshot_id = b.snapshot_id; +---- RESULTS +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +---- TYPES +BIGINT,BIGINT +==== +---- QUERY +select count(b.parent_id) from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.history b on a.snapshot_id = b.snapshot_id; +---- RESULTS +3 +---- TYPES +BIGINT +==== +---- QUERY +select a.snapshot_id from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.snapshots b on a.snapshot_id = b.snapshot_id +where a.snapshot_id = $OVERWRITE_SNAPSHOT_ID; +---- RESULTS +$OVERWRITE_SNAPSHOT_ID +---- TYPES +BIGINT + +#### +# Test 3 : Inline query +#### +==== +---- QUERY +select a.snapshot_id +from (select * from functional_parquet.iceberg_query_metadata.history) a; +---- RESULTS +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +---- TYPES +BIGINT + +#### +# Test 4 : Complex types +# Currently not supported, complex type slots are set to NULL (IMPALA-12205) +#### +==== +---- QUERY +select snapshot_id, summary from functional_parquet.iceberg_query_metadata.snapshots; +---- RESULTS +row_regex:[1-9]\d*|0,'NULL' +row_regex:[1-9]\d*|0,'NULL' +row_regex:[1-9]\d*|0,'NULL' +row_regex:[1-9]\d*|0,'NULL' +---- TYPES +BIGINT,STRING + +#### +# Test 5 : Multiple RowBatch results +#### +==== +---- QUERY +set BATCH_SIZE=1; +select * from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN + + +#### +# Test 6 : Timetravel +# Timetravel is not supported currently, related Jira IMPALA-11991. +#### +==== +---- QUERY +select * from functional_parquet.iceberg_query_metadata.snapshots FOR SYSTEM_VERSION AS OF $OVERWRITE_SNAPSHOT_ID; +---- CATCH +AnalysisException: FOR SYSTEM_VERSION AS OF clause is only supported for Iceberg tables. functional_parquet.iceberg_query_metadata.SNAPSHOTS is not an Iceberg table. +==== + +#### +# Test 7 : Use-cases +#### +==== +---- QUERY +# All reachable manifest files size +select sum(length) from functional_parquet.iceberg_query_metadata.all_manifests; +---- RESULTS +row_regex:[1-9]\d*|0 +---- TYPES +BIGINT +==== +---- QUERY +# How many manifests? +SELECT count(*) FROM functional_parquet.iceberg_query_metadata.manifests; +---- RESULTS +4 +---- TYPES +BIGINT +==== +---- QUERY +# Join metadata table with table +SELECT i, INPUT__FILE__NAME, file_size_in_bytes from functional_parquet.iceberg_query_metadata tbl +JOIN functional_parquet.iceberg_query_metadata.all_files mtbl on tbl.input__file__name = mtbl.file_path; +---- RESULTS +row_regex:[1-9]\d*|0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',[1-9]\d*|0 +row_regex:[1-9]\d*|0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',[1-9]\d*|0 +---- TYPES +INT,STRING,BIGINT + +#### +# Test 8 : Invalid operations +# In most cases the parser catches the table reference. +#### +==== +---- QUERY +describe functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +AnalysisException: Could not resolve path: 'functional_parquet.iceberg_query_metadata.snapshots' +==== +---- QUERY +show create table functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +insert into table functional_parquet.iceberg_query_metadata.snapshots values (1); +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +refresh functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +invalidate metadata functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +drop table functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +alter table functional_parquet.iceberg_query_metadata.snapshots add columns (col int); +---- CATCH +ParseException: Syntax error in line 1 ==== \ No newline at end of file diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index dce1716c0..bdb14a0e0 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -2032,6 +2032,44 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("drop database if exists {0} cascade".format(unique_database), user=ADMIN) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) + def test_iceberg_metadata_table_privileges(self, unique_name): + user = getuser() + admin_client = self.create_impala_client() + non_admin_client = self.create_impala_client() + short_table_name = "ice_1" + unique_database = unique_name + "_db" + tbl_name = unique_database + "." + short_table_name + + try: + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database), user=ADMIN) + admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + admin_client.execute("create table {0} (a int) stored as iceberg" + .format(tbl_name), user=ADMIN) + + # At this point, non-admin user without select privileges cannot query the metadata + # tables + result = self.execute_query_expect_failure(non_admin_client, + "select * from {0}.history".format(tbl_name), user=user) + assert "User '{0}' does not have privileges to execute 'SELECT' on: {1}".format( + user, unique_database) in str(result) + + # Grant 'user' select privilege on the table + admin_client.execute("grant select on table {0} to user {1}".format(tbl_name, user), + user=ADMIN) + result = non_admin_client.execute("select * from {0}.history".format(tbl_name), + user=user) + assert result.success is True + + finally: + admin_client.execute("revoke select on table {0} from user {1}" + .format(tbl_name, user), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database), + user=ADMIN) + @pytest.mark.execute_serially @SkipIf.is_test_jdk @SkipIfFS.hive diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index d6317d268..c47772bf4 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1213,8 +1213,17 @@ class TestIcebergV2Table(IcebergTestSuite): use_db="functional_parquet") def test_metadata_tables(self, vector): - self.run_test_case('QueryTest/iceberg-metadata-tables', vector, - use_db="functional_parquet") + with self.create_impala_client() as impalad_client: + overwrite_snapshot_id = impalad_client.execute("select snapshot_id from " + "functional_parquet.iceberg_query_metadata.snapshots " + "where operation = 'overwrite';") + overwrite_snapshot_ts = impalad_client.execute("select committed_at from " + "functional_parquet.iceberg_query_metadata.snapshots " + "where operation = 'overwrite';") + self.run_test_case('QueryTest/iceberg-metadata-tables', vector, + use_db="functional_parquet", + test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]), + '$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])}) def test_delete(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-delete', vector,