diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 8c4a4cf0b..4532eb9e9 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -495,6 +495,7 @@ set (IMPALA_LIBS ExecRcfile ExecSequence ExecText + ExecIcebergMetadata Exprs ExprsIr GlobalFlags @@ -594,6 +595,7 @@ if (BUILD_SHARED_LIBS) ExecRcfile ExecSequence ExecText + ExecIcebergMetadata CodeGen Exprs Rpc diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 12e2534e5..3f4fb4c87 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -24,6 +24,7 @@ add_subdirectory(parquet) add_subdirectory(rcfile) add_subdirectory(sequence) add_subdirectory(text) +add_subdirectory(iceberg-metadata) # where to put generated libraries set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec") diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index 7779cf2c7..745d9d51c 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -38,6 +38,7 @@ #include "exec/hdfs-scan-node-mt.h" #include "exec/hdfs-scan-node.h" #include "exec/iceberg-delete-node.h" +#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h" #include "exec/kudu/kudu-scan-node-mt.h" #include "exec/kudu/kudu-scan-node.h" #include "exec/kudu/kudu-util.h" @@ -226,6 +227,9 @@ Status PlanNode::CreatePlanNode( case TPlanNodeType::ICEBERG_DELETE_NODE: *node = pool->Add(new IcebergDeletePlanNode()); break; + case TPlanNodeType::ICEBERG_METADATA_SCAN_NODE: + *node = pool->Add(new IcebergMetadataScanPlanNode()); + break; default: map::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); diff --git a/be/src/exec/iceberg-metadata/CMakeLists.txt b/be/src/exec/iceberg-metadata/CMakeLists.txt new file mode 100644 index 000000000..ea63de71a --- /dev/null +++ b/be/src/exec/iceberg-metadata/CMakeLists.txt @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# where to put generated libraries +set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/iceberg-metadata") + +# where to put generated binaries +set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/iceberg-metadata") + +add_library(ExecIcebergMetadata + iceberg-metadata-scan-node.cc + iceberg-row-reader.cc +) + +add_dependencies(ExecIcebergMetadata gen-deps) diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc new file mode 100644 index 000000000..1e1ba76a5 --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h" +#include "exec/exec-node.inline.h" +#include "runtime/exec-env.h" +#include "runtime/runtime-state.h" +#include "runtime/tuple-row.h" +#include "service/frontend.h" +#include "util/jni-util.h" + +using namespace impala; + +Status IcebergMetadataScanPlanNode::CreateExecNode( + RuntimeState* state, ExecNode** node) const { + ObjectPool* pool = state->obj_pool(); + *node = pool->Add(new IcebergMetadataScanNode(pool, *this, state->desc_tbl())); + return Status::OK(); +} + +IcebergMetadataScanNode::IcebergMetadataScanNode(ObjectPool* pool, + const IcebergMetadataScanPlanNode& pnode, const DescriptorTbl& descs) + : ScanNode(pool, pnode, descs), + tuple_id_(pnode.tnode_->iceberg_scan_metadata_node.tuple_id), + table_name_(pnode.tnode_->iceberg_scan_metadata_node.table_name), + metadata_table_name_(pnode.tnode_->iceberg_scan_metadata_node.metadata_table_name) {} + +Status IcebergMetadataScanNode::InitJNI() { + DCHECK(impala_iceberg_metadata_scanner_cl_ == nullptr) << "InitJNI() already called!"; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + // Global class references: + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "org/apache/impala/util/IcebergMetadataScanner", + &impala_iceberg_metadata_scanner_cl_)); + // Method ids: + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "", "(Lorg/apache/impala/catalog/FeIcebergTable;Ljava/lang/String;)V", + &iceberg_metadata_scanner_ctor_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "ScanMetadataTable", "()V", &scan_metadata_table_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "GetNext", "()Lorg/apache/iceberg/StructLike;", &get_next_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, impala_iceberg_metadata_scanner_cl_, + "GetAccessor", "(I)Lorg/apache/iceberg/Accessor;", &get_accessor_)); + return Status::OK(); +} + +Status IcebergMetadataScanNode::Prepare(RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::Prepare(state)); + scan_prepare_timer_ = ADD_TIMER(runtime_profile(), "ScanPrepareTime"); + iceberg_api_scan_timer_ = ADD_TIMER(runtime_profile(), "IcebergApiScanTime"); + SCOPED_TIMER(scan_prepare_timer_); + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); + if (tuple_desc_ == nullptr) { + return Status("Failed to get tuple descriptor, tuple id: " + + std::to_string(tuple_id_)); + } + // Get the FeTable object from the Frontend + jobject jtable; + RETURN_IF_ERROR(GetCatalogTable(&jtable)); + // Create the Java Scanner object and scan the table + jstring jstr_metadata_table_name = env->NewStringUTF(metadata_table_name_.c_str()); + jobject jmetadata_scanner = env->NewObject(impala_iceberg_metadata_scanner_cl_, + iceberg_metadata_scanner_ctor_, jtable, jstr_metadata_table_name); + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jmetadata_scanner, &jmetadata_scanner_)); + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(ScanMetadataTable()); + // Create field accessors + for (SlotDescriptor* slot_desc: tuple_desc_->slots()) { + jobject accessor_for_field = env->CallObjectMethod(jmetadata_scanner_, + get_accessor_, slot_desc->col_pos()); + RETURN_ERROR_IF_EXC(env); + jobject accessor_for_field_global_ref; + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, accessor_for_field, + &accessor_for_field_global_ref)); + jaccessors_[slot_desc->col_pos()] = accessor_for_field_global_ref; + } + return Status::OK(); +} + +Status IcebergMetadataScanNode::Open(RuntimeState* state) { + RETURN_IF_ERROR(ScanNode::Open(state)); + iceberg_row_reader_.reset(new IcebergRowReader(tuple_desc_, jaccessors_)); + return Status::OK(); +} + +Status IcebergMetadataScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, + bool* eos) { + RETURN_IF_CANCELLED(state); + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + SCOPED_TIMER(materialize_tuple_timer()); + // Allocate buffer for RowBatch and init the tuple + uint8_t* tuple_buffer; + int64_t tuple_buffer_size; + RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, + &tuple_buffer)); + Tuple* tuple = reinterpret_cast(tuple_buffer); + tuple->Init(tuple_buffer_size); + while (!row_batch->AtCapacity()) { + int row_idx = row_batch->AddRow(); + TupleRow* tuple_row = row_batch->GetRow(row_idx); + tuple_row->SetTuple(0, tuple); + // Get the next row from 'org.apache.impala.util.IcebergMetadataScanner' + jobject struct_like_row = env->CallObjectMethod(jmetadata_scanner_, get_next_); + RETURN_ERROR_IF_EXC(env); + // When 'struct_like_row' is null, there are no more rows to read + if (struct_like_row == nullptr) { + *eos = true; + return Status::OK(); + } + // Translate a StructLikeRow from Iceberg to Tuple + RETURN_IF_ERROR(iceberg_row_reader_->MaterializeRow(env, struct_like_row, tuple, + row_batch->tuple_data_pool())); + env->DeleteLocalRef(struct_like_row); + RETURN_ERROR_IF_EXC(env); + COUNTER_ADD(rows_read_counter(), 1); + + // Evaluate conjuncts on this tuple row + if (ExecNode::EvalConjuncts(conjunct_evals().data(), + conjunct_evals().size(), tuple_row)) { + row_batch->CommitLastRow(); + tuple = reinterpret_cast( + reinterpret_cast(tuple) + tuple_desc_->byte_size()); + } else { + // Reset the null bits, everyhing else will be overwritten + Tuple::ClearNullBits(tuple, tuple_desc_->null_bytes_offset(), + tuple_desc_->num_null_bytes()); + } + } + return Status::OK(); +} + +void IcebergMetadataScanNode::Close(RuntimeState* state) { + if (is_closed()) return; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env != nullptr) { + // Close global references + if (jmetadata_scanner_ != nullptr) env->DeleteGlobalRef(jmetadata_scanner_); + for (auto accessor : jaccessors_) { + if (accessor.second != nullptr) env->DeleteGlobalRef(accessor.second); + } + } else { + LOG(ERROR) << "Couldn't get JNIEnv, unable to release Global JNI references"; + } + ScanNode::Close(state); +} + +Status IcebergMetadataScanNode::GetCatalogTable(jobject* jtable) { + Frontend* fe = ExecEnv::GetInstance()->frontend(); + RETURN_IF_ERROR(fe->GetCatalogTable(table_name_, jtable)); + return Status::OK(); +} + +Status IcebergMetadataScanNode::ScanMetadataTable() { + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + SCOPED_TIMER(iceberg_api_scan_timer_); + env->CallObjectMethod(jmetadata_scanner_, scan_metadata_table_); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h new file mode 100644 index 000000000..c994c7682 --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/iceberg-metadata/iceberg-row-reader.h" +#include "exec/scan-node.h" + +#include + +namespace impala { + +class ExecNode; +class IcebergRowReader; +class RuntimeState; +class Status; + +/// Scan node for an Iceberg metadata table. +/// Iceberg API provides predefined metadata tables, these tables can be scanned through +/// the Iceberg API just like any other regular Iceberg tables. Although, Impala utilizes +/// its Parquet scanner to scan Iceberg data, due to the virtual nature of the metadata +/// tables these should be scanned with the Iceberg API. +/// +/// For scanning these metadata tables this scanner calls into the JVM and creates an +/// 'IcebergMetadataScanner' object that does the scanning. Once the Iceberg scan is done, +/// the scan node starts fetching the rows one by one and materializes the Iceberg rows +/// into Impala rowbatches. +/// +/// The flow of scanning is: +/// 1. Backend: gets the FeIcebergTable object from the frontend +/// 2. Backend: creates an IcebergMetadataScanner object on the Java heap +/// 3. Backend: triggers a metadata table creation and scan on the Frontend +/// 4. Frontend: creates the metadata table and executes the scan +/// 5. Backend: calls GetNext that calls the IcebergMetadataScanner's GetNext +/// 6. Frontend: IcebergMetadataScanner's GetNext iterates over the result set and returns +/// a row in StructLike format +/// 7. Backend: converts and materializes the returned StructLike object into RowBatch +/// +/// Note: +/// This scan node should be executed on the coordinator, because it depends on the +/// frontend's table cache. +class IcebergMetadataScanPlanNode : public ScanPlanNode { + public: + Status CreateExecNode(RuntimeState* state, ExecNode** node) const override; + ~IcebergMetadataScanPlanNode(){} +}; + +class IcebergMetadataScanNode : public ScanNode { + public: + IcebergMetadataScanNode(ObjectPool* pool, const IcebergMetadataScanPlanNode& pnode, + const DescriptorTbl& descs); + + /// JNI setup. Creates global references for Java classes and find method ids. + /// Initializes static members, should be called once per process lifecycle. + static Status InitJNI() WARN_UNUSED_RESULT; + + /// Initializes counters, executes Iceberg table scan and initializes accessors. + Status Prepare(RuntimeState* state) override; + + /// Creates the Iceberg row reader. + Status Open(RuntimeState* state) override; + + /// Fills the next rowbatch with the results returned by the Iceberg scan. + Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override; + + /// Finalize and close this operator. + void Close(RuntimeState* state) override; + + private: + /// Global class references created with JniUtil. + inline static jclass impala_iceberg_metadata_scanner_cl_ = nullptr; + + /// Method references created with JniUtil. + inline static jmethodID iceberg_metadata_scanner_ctor_ = nullptr; + inline static jmethodID scan_metadata_table_ = nullptr; + inline static jmethodID get_accessor_ = nullptr; + inline static jmethodID get_next_ = nullptr; + + /// Iceberg metadata scanner Java object, it helps preparing the metadata table and + /// executes an Iceberg table scan. Allows the ScanNode to fetch the metadata from + /// the Java Heap. + jobject jmetadata_scanner_; + + /// Helper class to transform Iceberg rows to Impala tuples. + std::unique_ptr iceberg_row_reader_; + + /// Accessor map for the scan result, pairs the slot ids with the java Accessor + /// objects. + std::unordered_map jaccessors_; + + /// Tuple id resolved in Prepare() to set 'tuple_desc_'. + TupleId tuple_id_; + + /// Descriptor of tuples read from Iceberg metadata table. + const TupleDescriptor* tuple_desc_ = nullptr; + + /// Table and metadata table names. + const TTableName table_name_; + const string metadata_table_name_; + + /// Iceberg metadata scan specific counters. + RuntimeProfile::Counter* scan_prepare_timer_; + RuntimeProfile::Counter* iceberg_api_scan_timer_; + + /// Initializes the metadata table and executes an Iceberg scan through JNI. + Status ScanMetadataTable(); + + /// Gets the FeIceberg table from the Frontend. + Status GetCatalogTable(jobject* jtable); +}; + +} diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc new file mode 100644 index 000000000..b033d8bda --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/exec-node.inline.h" +#include "exec/iceberg-metadata/iceberg-row-reader.h" +#include "runtime/runtime-state.h" +#include "runtime/timestamp-value.inline.h" +#include "runtime/tuple-row.h" +#include "util/jni-util.h" + +namespace impala { + +IcebergRowReader::IcebergRowReader( + const TupleDescriptor* tuple_desc, const std::unordered_map& jaccessor) + : tuple_desc_(tuple_desc), + jaccessors_(jaccessor) {} + +Status IcebergRowReader::InitJNI() { + DCHECK(iceberg_accessor_cl_ == nullptr) << "InitJNI() already called!"; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + // Global class references: + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "org/apache/iceberg/Accessor", &iceberg_accessor_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/util/List", &list_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "org/apache/iceberg/types/Types$NestedField", &iceberg_nested_field_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/Boolean", &java_boolean_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/Integer", &java_int_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/Long", &java_long_cl_)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, + "java/lang/CharSequence", &java_char_sequence_cl_)); + + // Method ids: + RETURN_IF_ERROR(JniUtil::GetMethodID(env, list_cl_, "get", + "(I)Ljava/lang/Object;", &list_get_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, iceberg_accessor_cl_, "get", + "(Ljava/lang/Object;)Ljava/lang/Object;", &iceberg_accessor_get_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_boolean_cl_, "booleanValue", "()Z", + &boolean_value_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_int_cl_, "intValue", "()I", + &int_value_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_long_cl_, "longValue", "()J", + &long_value_)); + RETURN_IF_ERROR(JniUtil::GetMethodID(env, java_char_sequence_cl_, "toString", + "()Ljava/lang/String;", &char_sequence_to_string_)); + return Status::OK(); +} + +Status IcebergRowReader::MaterializeRow(JNIEnv* env, + jobject struct_like_row, Tuple* tuple, MemPool* tuple_data_pool) { + DCHECK(env != nullptr); + DCHECK(struct_like_row != nullptr); + DCHECK(tuple != nullptr); + DCHECK(tuple_data_pool != nullptr); + for (SlotDescriptor* slot_desc: tuple_desc_->slots()) { + jobject accessed_value = env->CallObjectMethod(jaccessors_.at(slot_desc->col_pos()), + iceberg_accessor_get_, struct_like_row); + RETURN_ERROR_IF_EXC(env); + if (accessed_value == nullptr) { + tuple->SetNull(slot_desc->null_indicator_offset()); + continue; + } + void* slot = tuple->GetSlot(slot_desc->tuple_offset()); + switch (slot_desc->type().type) { + case TYPE_BOOLEAN: { // java.lang.Boolean + RETURN_IF_ERROR(WriteBooleanSlot(env, accessed_value, slot)); + break; + } case TYPE_INT: { // java.lang.Integer + RETURN_IF_ERROR(WriteIntSlot(env, accessed_value, slot)); + break; + } case TYPE_BIGINT: { // java.lang.Long + RETURN_IF_ERROR(WriteLongSlot(env, accessed_value, slot)); + break; + } case TYPE_TIMESTAMP: { // org.apache.iceberg.types.TimestampType + RETURN_IF_ERROR(WriteTimeStampSlot(env, accessed_value, slot)); + break; + } case TYPE_STRING: { // java.lang.String + RETURN_IF_ERROR(WriteStringSlot(env, accessed_value, slot, tuple_data_pool)); + break; + } + default: + // Skip the unsupported type and set it to NULL + tuple->SetNull(slot_desc->null_indicator_offset()); + VLOG(3) << "Skipping unsupported column type: " << slot_desc->type().type; + } + } + return Status::OK(); +} + +Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, jobject accessed_value, + void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_boolean_cl_) == JNI_TRUE); + jboolean result = env->CallBooleanMethod(accessed_value, boolean_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = (bool)(result == JNI_TRUE); + return Status::OK(); +} + +Status IcebergRowReader::WriteIntSlot(JNIEnv* env, jobject accessed_value, void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_int_cl_) == JNI_TRUE); + jint result = env->CallIntMethod(accessed_value, int_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = reinterpret_cast(result); + return Status::OK(); +} + +Status IcebergRowReader::WriteLongSlot(JNIEnv* env, jobject accessed_value, void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE); + jlong result = env->CallLongMethod(accessed_value, long_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = reinterpret_cast(result); + return Status::OK(); +} + +Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, jobject accessed_value, + void* slot) { + DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE); + jlong result = env->CallLongMethod(accessed_value, long_value_); + RETURN_ERROR_IF_EXC(env); + *reinterpret_cast(slot) = TimestampValue::FromUnixTimeMicros(result, + UTCPTR); + return Status::OK(); +} + +Status IcebergRowReader::WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot, + MemPool* tuple_data_pool) { + DCHECK(env->IsInstanceOf(accessed_value, java_char_sequence_cl_) == JNI_TRUE); + jstring result = static_cast(env->CallObjectMethod(accessed_value, + char_sequence_to_string_)); + RETURN_ERROR_IF_EXC(env); + JniUtfCharGuard str_guard; + RETURN_IF_ERROR(JniUtfCharGuard::create(env, result, &str_guard)); + // Allocate memory and copy the string from the JVM to the RowBatch + int str_len = strlen(str_guard.get()); + char* buffer = reinterpret_cast(tuple_data_pool->TryAllocateUnaligned(str_len)); + if (UNLIKELY(buffer == nullptr)) { + string details = strings::Substitute("Failed to allocate $1 bytes for string.", + str_len); + return tuple_data_pool->mem_tracker()->MemLimitExceeded(nullptr, details, str_len); + } + memcpy(buffer, str_guard.get(), str_len); + reinterpret_cast(slot)->ptr = buffer; + reinterpret_cast(slot)->len = str_len; + return Status::OK(); +} + +} \ No newline at end of file diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.h b/be/src/exec/iceberg-metadata/iceberg-row-reader.h new file mode 100644 index 000000000..15c71a038 --- /dev/null +++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.h @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace impala { + +class MemPool; +class Status; +class SlotDescriptor; +class Tuple; +class TupleDescriptor; + +/// Row reader for Iceberg table scans, it translates a {StructLike} Java object to Impala +/// rows. It utilizes the provided {Accessor} objects to do this translation. +class IcebergRowReader { + public: + /// Initialize the tuple descriptor and accessors + IcebergRowReader(const TupleDescriptor* tuple_desc, + const std::unordered_map& jaccessors); + + /// JNI setup. Create global references for Java classes and find method ids. + /// Initializes static members, should be called once per process lifecycle. + static Status InitJNI(); + + /// Materlilize the StructLike Java objects into Impala rows. + Status MaterializeRow(JNIEnv* env, jobject struct_like_row, Tuple* tuple, + MemPool* tuple_data_pool); + + private: + /// Global class references created with JniUtil. + inline static jclass iceberg_accessor_cl_ = nullptr; + inline static jclass iceberg_nested_field_cl_ = nullptr; + inline static jclass list_cl_ = nullptr; + inline static jclass java_boolean_cl_ = nullptr; + inline static jclass java_int_cl_ = nullptr; + inline static jclass java_long_cl_ = nullptr; + inline static jclass java_char_sequence_cl_ = nullptr; + + /// Method references created with JniUtil. + inline static jmethodID iceberg_accessor_get_ = nullptr; + inline static jmethodID list_get_ = nullptr; + inline static jmethodID boolean_value_ = nullptr; + inline static jmethodID int_value_ = nullptr; + inline static jmethodID long_value_ = nullptr; + inline static jmethodID char_sequence_to_string_ = nullptr; + + /// TupleDescriptor received from the ScanNode. + const TupleDescriptor* tuple_desc_; + + /// Accessor map for the scan result, pairs the slot ids with the java Accessor + /// objects. + const std::unordered_map jaccessors_; + + /// Reads the value of a primitive from the StructLike, translates it to a matching + /// Impala type and writes it into the target tuple. The related Accessor objects are + /// stored in the jaccessors_ map and created during Prepare. + Status WriteBooleanSlot(JNIEnv* env, jobject accessed_value, void* slot); + Status WriteIntSlot(JNIEnv* env, jobject accessed_value, void* slot); + Status WriteLongSlot(JNIEnv* env, jobject accessed_value, void* slot); + /// Iceberg TimeStamp is parsed into TimestampValue. + Status WriteTimeStampSlot(JNIEnv* env, jobject accessed_value, void* slot); + /// To obtain a character sequence from JNI the JniUtfCharGuard class is used. Then the + /// data has to be copied to the tuple_data_pool, because the JVM releases the reference + /// and reclaims the memory area. + Status WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot, + MemPool* tuple_data_pool); +}; + +} diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index d5ebed49d..d42ec883b 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -65,7 +65,7 @@ static const string SCHEDULER_WARNING_KEY("Scheduler Warning"); static const vector SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE, TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE, - TPlanNodeType::KUDU_SCAN_NODE}; + TPlanNodeType::KUDU_SCAN_NODE, TPlanNodeType::ICEBERG_METADATA_SCAN_NODE}; // Consistent scheduling requires picking up to k distinct candidates out of n nodes. // Since each iteration can pick a node that it already picked (i.e. it is sampling with diff --git a/be/src/service/frontend.cc b/be/src/service/frontend.cc index 4dcfd2506..65f830e88 100644 --- a/be/src/service/frontend.cc +++ b/be/src/service/frontend.cc @@ -113,6 +113,8 @@ Frontend::Frontend() { {"getFunctions", "([B)[B", &get_functions_id_}, {"getTableHistory", "([B)[B", &get_table_history_id_}, {"getCatalogObject", "([B)[B", &get_catalog_object_id_}, + {"getCatalogTable", "([B)Lorg/apache/impala/catalog/FeTable;", + &get_catalog_table_id_}, {"getRoles", "([B)[B", &show_roles_id_}, {"getPrincipalPrivileges", "([B)[B", &get_principal_privileges_id_}, {"execHiveServer2MetadataOp", "([B)[B", &exec_hs2_metadata_op_id_}, @@ -253,6 +255,10 @@ Status Frontend::GetCatalogObject(const TCatalogObject& req, return JniUtil::CallJniMethod(fe_, get_catalog_object_id_, req, resp); } +Status Frontend::GetCatalogTable(const TTableName& table_name, jobject* result) { + return JniUtil::CallJniMethod(fe_, get_catalog_table_id_, table_name, result); +} + Status Frontend::GetExecRequest( const TQueryCtx& query_ctx, TExecRequest* result) { return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result); diff --git a/be/src/service/frontend.h b/be/src/service/frontend.h index 1df8dc44b..d450b0945 100644 --- a/be/src/service/frontend.h +++ b/be/src/service/frontend.h @@ -119,6 +119,10 @@ class Frontend { /// information on the error will be returned. Status GetCatalogObject(const TCatalogObject& request, TCatalogObject* response); + /// Gets the Java object of a Catalog table object. It can be used to call Java methods + /// of the Catalog Table object. + Status GetCatalogTable(const TTableName& table_name, jobject *result); + /// Call FE to get the roles. Status ShowRoles(const TShowRolesParams& params, TShowRolesResult* result); @@ -246,6 +250,7 @@ class Frontend { jmethodID get_functions_id_; // JniFrontend.getFunctions jmethodID get_table_history_id_; // JniFrontend.getTableHistory jmethodID get_catalog_object_id_; // JniFrontend.getCatalogObject + jmethodID get_catalog_table_id_; // JniFrontend.getCatalogTable jmethodID show_roles_id_; // JniFrontend.getRoles jmethodID get_principal_privileges_id_; // JniFrontend.getPrincipalPrivileges jmethodID exec_hs2_metadata_op_id_; // JniFrontend.execHiveServer2MetadataOp diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc index 0390714de..ed9159ba2 100644 --- a/be/src/service/impalad-main.cc +++ b/be/src/service/impalad-main.cc @@ -29,6 +29,8 @@ #include "common/status.h" #include "exec/hbase/hbase-table-scanner.h" #include "exec/hbase/hbase-table-writer.h" +#include "exec/iceberg-metadata/iceberg-metadata-scan-node.h" +#include "exec/iceberg-metadata/iceberg-row-reader.h" #include "exprs/hive-udf-call.h" #include "exprs/timezone_db.h" #include "gen-cpp/ImpalaService.h" @@ -64,6 +66,8 @@ int ImpaladMain(int argc, char** argv) { ABORT_IF_ERROR(HBaseTableScanner::Init()); ABORT_IF_ERROR(HBaseTable::InitJNI()); ABORT_IF_ERROR(HBaseTableWriter::InitJNI()); + ABORT_IF_ERROR(IcebergMetadataScanNode::InitJNI()); + ABORT_IF_ERROR(IcebergRowReader::InitJNI()); ABORT_IF_ERROR(HiveUdfCall::InitEnv()); ABORT_IF_ERROR(JniCatalogCacheUpdateIterator::InitJNI()); InitFeSupport(); diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc index 5a00e43c8..5b7dc19bd 100644 --- a/be/src/util/jni-util.cc +++ b/be/src/util/jni-util.cc @@ -112,6 +112,20 @@ bool JniUtil::MethodExists(JNIEnv* env, jclass class_ref, const char* method_str return true; } +Status JniUtil::GetMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref) { + *method_ref = env->GetMethodID(class_ref, method_str, method_signature); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + +Status JniUtil::GetStaticMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref) { + *method_ref = env->GetStaticMethodID(class_ref, method_str, method_signature); + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) { *class_ref = NULL; jclass local_cl = env->FindClass(class_str); diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index f56ce404b..5005315bb 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -226,6 +226,8 @@ class JniCall { Status ObjectToResult(jobject obj, std::string* result) WARN_UNUSED_RESULT; + Status ObjectToResult(jobject obj, jobject* result) WARN_UNUSED_RESULT; + const jmethodID method_; JNIEnv* const env_; JniLocalFrame frame_; @@ -281,6 +283,16 @@ class JniUtil { static bool MethodExists(JNIEnv* env, jclass class_ref, const char* method_str, const char* method_signature); + /// Wrapper method around JNI's 'GetMethodID'. Returns the method reference for the + /// requested method. + static Status GetMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref); + + /// Wrapper method around JNI's 'GetStaticMethodID'. Returns the method reference for + /// the requested method. + static Status GetStaticMethodID(JNIEnv* env, jclass class_ref, const char* method_str, + const char* method_signature, jmethodID* method_ref); + /// Returns a global JNI reference to the class specified by class_str into class_ref. /// The returned reference must eventually be freed by calling FreeGlobalRef() (or have /// the lifetime of the impalad process). @@ -377,6 +389,10 @@ class JniUtil { static Status CallJniMethod(const jobject& obj, const jmethodID& method, R* response) WARN_UNUSED_RESULT; + template + static Status CallJniMethod(const jobject& obj, const jmethodID& method, + const T& arg, jobject* response) WARN_UNUSED_RESULT; + private: // Slow-path for GetJNIEnv, used on the first call by any thread. static JNIEnv* GetJNIEnvSlowPath(); @@ -442,6 +458,12 @@ inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method return JniCall::instance_method(obj, method).Call(response); } +template +inline Status JniUtil::CallJniMethod(const jobject& obj, const jmethodID& method, + const T& arg, jobject* response) { + return JniCall::instance_method(obj, method).with_thrift_arg(arg).Call(response); +} + inline JniCall::JniCall(jmethodID method) : method_(method), env_(JniUtil::GetJNIEnv()) { @@ -506,6 +528,14 @@ inline Status JniCall::ObjectToResult(jobject obj, std::string* result) { return Status::OK(); } +inline Status JniCall::ObjectToResult(jobject obj, jobject* result) { + DCHECK(obj) << "Call returned unexpected null Thrift object"; + JNIEnv* env = JniUtil::GetJNIEnv(); + if (env == nullptr) return Status("Failed to get/create JVM"); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, obj, result)); + return Status::OK(); +} + } #endif diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 120f6c66c..9914c66b5 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -52,6 +52,7 @@ enum TPlanNodeType { CARDINALITY_CHECK_NODE = 16 MULTI_AGGREGATION_NODE = 17 ICEBERG_DELETE_NODE = 18 + ICEBERG_METADATA_SCAN_NODE = 19 } // phases of an execution node @@ -679,6 +680,15 @@ struct TCardinalityCheckNode { 1: required string display_statement } +struct TIcebergMetadataScanNode { + // Tuple ID of the Tuple this scanner should scan. + 1: required Types.TTupleId tuple_id + // Name of the FeIcebergTable that will be used for the metadata table scan. + 2: required CatalogObjects.TTableName table_name + // The metadata table name specifies which metadata table has to be scanned. + 3: required string metadata_table_name; +} + // See PipelineMembership in the frontend for details. struct TPipelineMembership { 1: required Types.TPlanNodeId pipe_id @@ -705,38 +715,39 @@ struct TPlanNode { // node is codegen'd if the backend supports it. 8: required bool disable_codegen - 27: required list pipelines + 9: required list pipelines // one field per PlanNode subclass - 9: optional THdfsScanNode hdfs_scan_node - 10: optional THBaseScanNode hbase_scan_node - 11: optional TKuduScanNode kudu_scan_node - 12: optional TDataSourceScanNode data_source_node - 13: optional TJoinNode join_node + 10: optional THdfsScanNode hdfs_scan_node + 11: optional THBaseScanNode hbase_scan_node + 12: optional TKuduScanNode kudu_scan_node + 13: optional TDataSourceScanNode data_source_node + 14: optional TJoinNode join_node 15: optional TAggregationNode agg_node 16: optional TSortNode sort_node 17: optional TUnionNode union_node 18: optional TExchangeNode exchange_node 19: optional TAnalyticNode analytic_node 20: optional TUnnestNode unnest_node + 21: optional TIcebergMetadataScanNode iceberg_scan_metadata_node // Label that should be used to print this node to the user. - 21: optional string label + 22: optional string label // Additional details that should be printed to the user. This is node specific // e.g. table name, join strategy, etc. - 22: optional string label_detail + 23: optional string label_detail // Estimated execution stats generated by the planner. - 23: optional ExecStats.TExecStats estimated_stats + 24: optional ExecStats.TExecStats estimated_stats // Runtime filters assigned to this plan node - 24: optional list runtime_filters + 25: optional list runtime_filters // Resource profile for this plan node. - 25: required ResourceProfile.TBackendResourceProfile resource_profile + 26: required ResourceProfile.TBackendResourceProfile resource_profile - 26: optional TCardinalityCheckNode cardinality_check_node + 27: optional TCardinalityCheckNode cardinality_check_node } // A flattened representation of a tree of PlanNodes, obtained by depth-first diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java index 224be7859..230cf351a 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java @@ -3618,7 +3618,7 @@ public class Analyzer { // The catalog table (the base of the virtual table) has been loaded and cached // under the name of the virtual table. FeTable catalogTable = getStmtTableCache().tables.get(virtualTableName); - if (catalogTable instanceof IcebergMetadataTable) return; + if (catalogTable instanceof IcebergMetadataTable || catalogTable == null) return; IcebergMetadataTable virtualTable = new IcebergMetadataTable(catalogTable, tblRefPath.get(2)); getStmtTableCache().tables.put(catalogTableName, catalogTable); diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java b/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java index 78823d002..294b7c56b 100644 --- a/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java +++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java @@ -29,6 +29,8 @@ import com.google.common.base.Preconditions; */ public class IcebergMetadataTableRef extends TableRef { + private String metadataTableName_; + public IcebergMetadataTableRef(TableRef tableRef, Path resolvedPath) { super(tableRef); Preconditions.checkState(resolvedPath.isResolved()); @@ -37,20 +39,28 @@ public class IcebergMetadataTableRef extends TableRef { resolvedPath_ = resolvedPath; IcebergMetadataTable iceMTbl = (IcebergMetadataTable)resolvedPath.getRootTable(); FeIcebergTable iceTbl = iceMTbl.getBaseTable(); + metadataTableName_ = iceMTbl.getMetadataTableName(); if (hasExplicitAlias()) return; aliases_ = new String[] { iceTbl.getTableName().toString().toLowerCase(), iceTbl.getName().toLowerCase()}; } + public String getMetadataTableName() { + return metadataTableName_; + } + @Override public void analyze(Analyzer analyzer) throws AnalysisException { if (isAnalyzed_) return; IcebergMetadataTable rootTable = (IcebergMetadataTable)resolvedPath_.getRootTable(); FeTable iceRootTable = rootTable.getBaseTable(); analyzer.registerAuthAndAuditEvent(iceRootTable, priv_, requireGrantOption_); + analyzeTimeTravel(analyzer); desc_ = analyzer.registerTableRef(this); isAnalyzed_ = true; + analyzeHints(analyzer); + analyzeJoin(analyzer); } } diff --git a/fe/src/main/java/org/apache/impala/analysis/Path.java b/fe/src/main/java/org/apache/impala/analysis/Path.java index 2f37489e4..f26c63ab0 100644 --- a/fe/src/main/java/org/apache/impala/analysis/Path.java +++ b/fe/src/main/java/org/apache/impala/analysis/Path.java @@ -29,6 +29,7 @@ import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.VirtualColumn; +import org.apache.impala.catalog.VirtualTable; import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.thrift.TVirtualColumnType; import org.apache.impala.util.AcidUtils; @@ -447,6 +448,9 @@ public class Path { } else { result.add(rootTable_.getDb().getName()); result.add(rootTable_.getName()); + if (rootTable_ instanceof VirtualTable) { + result.add(((IcebergMetadataTable)rootTable_).getMetadataTableName()); + } } result.addAll(rawPath_); return result; diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java index b7d0dcbe0..a92b07471 100644 --- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java @@ -27,7 +27,6 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.impala.analysis.TableName; import org.apache.impala.catalog.Column; -import org.apache.impala.catalog.FeCatalogUtils; import org.apache.impala.catalog.FeIcebergTable; import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.VirtualTable; @@ -45,7 +44,11 @@ import com.google.common.base.Preconditions; * table object based on the Iceberg API. */ public class IcebergMetadataTable extends VirtualTable { + + // The Iceberg table that is the base of the metadata table. private FeIcebergTable baseTable_; + + // Name of the metadata table. private String metadataTableName_; public IcebergMetadataTable(FeTable baseTable, String metadataTableTypeStr) @@ -53,7 +56,7 @@ public class IcebergMetadataTable extends VirtualTable { super(null, baseTable.getDb(), baseTable.getName(), baseTable.getOwnerUser()); Preconditions.checkArgument(baseTable instanceof FeIcebergTable); baseTable_ = (FeIcebergTable) baseTable; - metadataTableName_ = metadataTableTypeStr; + metadataTableName_ = metadataTableTypeStr.toUpperCase(); MetadataTableType type = MetadataTableType.from(metadataTableTypeStr.toUpperCase()); Preconditions.checkNotNull(type); Table metadataTable = MetadataTableUtils.createMetadataTableInstance( @@ -79,6 +82,10 @@ public class IcebergMetadataTable extends VirtualTable { return super.getFullName() + "." + metadataTableName_; } + public String getMetadataTableName() { + return metadataTableName_; + } + @Override public TableName getTableName() { return new TableName(db_.getName(), name_, metadataTableName_); @@ -105,7 +112,6 @@ public class IcebergMetadataTable extends VirtualTable { public TTableDescriptor toThriftDescriptor(int tableId, Set referencedPartitions) { TTableDescriptor desc = baseTable_.toThriftDescriptor(tableId, referencedPartitions); - desc.setColumnDescriptors(FeCatalogUtils.getTColumnDescriptors(this)); return desc; } diff --git a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java index 23601e098..2fa2b4445 100644 --- a/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/DistributedPlanner.java @@ -116,8 +116,13 @@ public class DistributedPlanner { PlanFragment result = null; if (root instanceof ScanNode) { - result = createScanFragment(root); - fragments.add(result); + if (root instanceof IcebergMetadataScanNode) { + result = createIcebergMetadataScanFragment(root); + fragments.add(result); + } else { + result = createScanFragment(root); + fragments.add(result); + } } else if (root instanceof HashJoinNode) { Preconditions.checkState(childFragments.size() == 2); result = createHashJoinFragment((HashJoinNode) root, @@ -152,9 +157,6 @@ public class DistributedPlanner { ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED); } else if (root instanceof CardinalityCheckNode) { result = createCardinalityCheckNodeFragment((CardinalityCheckNode) root, childFragments); - } else if (root instanceof IcebergMetadataScanNode) { - result = createIcebergMetadataScanFragment(root); - fragments.add(result); } else if (root instanceof IcebergDeleteNode) { Preconditions.checkState(childFragments.size() == 2); result = createIcebergDeleteFragment((IcebergDeleteNode) root, @@ -406,7 +408,8 @@ public class DistributedPlanner { } /** - * Create an Iceberg Metadata scan fragment. + * Create an Iceberg Metadata scan fragment. This fragment is UNPARTITIONED, so the + * scheduler can schedule it as coordinator only fragment. */ private PlanFragment createIcebergMetadataScanFragment(PlanNode node) { return new PlanFragment(ctx_.getNextFragmentId(), node, DataPartition.UNPARTITIONED); diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java index 4a776786e..181cce8d3 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergMetadataScanNode.java @@ -17,28 +17,45 @@ package org.apache.impala.planner; +import java.util.List; + import org.apache.impala.analysis.Analyzer; -import org.apache.impala.analysis.TupleDescriptor; +import org.apache.impala.analysis.Expr; +import org.apache.impala.analysis.TableRef; +import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.ImpalaException; import org.apache.impala.thrift.TExplainLevel; +import org.apache.impala.thrift.TIcebergMetadataScanNode; import org.apache.impala.thrift.TPlanNode; +import org.apache.impala.thrift.TPlanNodeType; import org.apache.impala.thrift.TQueryOptions; import org.apache.impala.thrift.TScanRangeSpec; +import org.apache.impala.thrift.TTableName; import com.google.common.base.Preconditions; public class IcebergMetadataScanNode extends ScanNode { - protected IcebergMetadataScanNode(PlanNodeId id, TupleDescriptor desc) { - super(id, desc, "SCAN ICEBERG METADATA"); + // Metadata table name, it is stored here so it can be passed to the backend. + protected final String metadataTableName_; + + protected IcebergMetadataScanNode(PlanNodeId id, List conjuncts, + TableRef tblRef) { + super(id, tblRef.getDesc(), "SCAN ICEBERG METADATA"); + conjuncts_ = conjuncts; + metadataTableName_ = ((IcebergMetadataTable)tblRef.getTable()).getMetadataTableName(); } @Override public void init(Analyzer analyzer) throws ImpalaException { - super.init(analyzer); scanRangeSpecs_ = new TScanRangeSpec(); + assignConjuncts(analyzer); + conjuncts_ = orderConjunctsByCost(conjuncts_); + analyzer.materializeSlots(conjuncts_); computeMemLayout(analyzer); computeStats(analyzer); + numInstances_ = 1; + numNodes_ = 1; } @Override @@ -62,7 +79,12 @@ public class IcebergMetadataScanNode extends ScanNode { @Override protected void toThrift(TPlanNode msg) { - // Implement for fragment execution + msg.iceberg_scan_metadata_node = new TIcebergMetadataScanNode(); + msg.node_type = TPlanNodeType.ICEBERG_METADATA_SCAN_NODE; + msg.iceberg_scan_metadata_node.table_name = + new TTableName(desc_.getTableName().getDb(), desc_.getTableName().getTbl()); + msg.iceberg_scan_metadata_node.metadata_table_name = metadataTableName_; + msg.iceberg_scan_metadata_node.tuple_id = desc_.getId().asInt(); } @Override diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java index 0d4d37ca2..4b26a5132 100644 --- a/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/IcebergScanPlanner.java @@ -81,6 +81,7 @@ import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.VirtualColumn; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; @@ -141,7 +142,8 @@ public class IcebergScanPlanner { public IcebergScanPlanner(Analyzer analyzer, PlannerContext ctx, TableRef iceTblRef, List conjuncts, MultiAggregateInfo aggInfo) throws ImpalaException { - Preconditions.checkState(iceTblRef.getTable() instanceof FeIcebergTable); + Preconditions.checkState(iceTblRef.getTable() instanceof FeIcebergTable || + iceTblRef.getTable() instanceof IcebergMetadataTable); analyzer_ = analyzer; ctx_ = ctx; tblRef_ = iceTblRef; diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java index 9437fddf6..03d573002 100644 --- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java @@ -71,6 +71,7 @@ import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.HdfsFileFormat; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.TableLoadingException; +import org.apache.impala.catalog.iceberg.IcebergMetadataTable; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.InternalException; @@ -1872,7 +1873,6 @@ public class SingleNodePlanner { Expr.removeDuplicates(conjuncts); } - // TODO(todd) introduce FE interfaces for DataSourceTable, HBaseTable, KuduTable FeTable table = tblRef.getTable(); if (table instanceof FeFsTable) { if (table instanceof FeIcebergTable) { @@ -1897,16 +1897,18 @@ public class SingleNodePlanner { aggInfo, tblRef); scanNode.init(analyzer); return scanNode; + } else if (table instanceof IcebergMetadataTable) { + return createIcebergMetadataScanNode(tblRef, conjuncts, analyzer); } else { throw new NotImplementedException( "Planning not implemented for table class: " + table.getClass()); } } - private PlanNode createIcebergMetadataScanNode(TableRef tblRef, Analyzer analyzer) - throws ImpalaException { + private PlanNode createIcebergMetadataScanNode(TableRef tblRef, List conjuncts, + Analyzer analyzer) throws ImpalaException { IcebergMetadataScanNode icebergMetadataScanNode = - new IcebergMetadataScanNode(ctx_.getNextNodeId(), tblRef.getDesc()); + new IcebergMetadataScanNode(ctx_.getNextNodeId(), conjuncts, tblRef); icebergMetadataScanNode.init(analyzer); return icebergMetadataScanNode; } @@ -2224,7 +2226,7 @@ public class SingleNodePlanner { result = new SingularRowSrcNode(ctx_.getNextNodeId(), ctx_.getSubplan()); result.init(analyzer); } else if (tblRef instanceof IcebergMetadataTableRef) { - result = createIcebergMetadataScanNode(tblRef, analyzer); + result = createScanNode(tblRef, aggInfo, analyzer); } else { throw new NotImplementedException( "Planning not implemented for table ref class: " + tblRef.getClass()); diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index da34d41ce..6bb6b7015 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -2490,14 +2490,6 @@ public class Frontend { return result; } - // Blocking query execution for queries that contain IcebergMetadataTables - for (FeTable table : stmtTableCache.tables.values()) { - if (table instanceof IcebergMetadataTable) { - throw new NotImplementedException(String.format("'%s' refers to a metadata " - + "table which is currently not supported.", table.getFullName())); - } - } - result.setQuery_exec_request(queryExecRequest); if (analysisResult.isQueryStmt()) { result.stmt_type = TStmtType.QUERY; diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index b672cb6a8..763256a0c 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -38,8 +38,10 @@ import org.apache.impala.authentication.saml.WrappedWebContext; import org.apache.impala.authorization.AuthorizationFactory; import org.apache.impala.authorization.ImpalaInternalAdminUser; import org.apache.impala.authorization.User; +import org.apache.impala.catalog.DatabaseNotFoundException; import org.apache.impala.catalog.FeDataSource; import org.apache.impala.catalog.FeDb; +import org.apache.impala.catalog.FeTable; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; @@ -53,6 +55,7 @@ import org.apache.impala.service.Frontend.PlanCtx; import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.thrift.TBuildTestDescriptorTableParams; import org.apache.impala.thrift.TCatalogObject; +import org.apache.impala.thrift.TColumnValue; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TDescribeDbParams; import org.apache.impala.thrift.TDescribeOutputStyle; @@ -112,6 +115,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.lang.IllegalArgumentException; +import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.List; @@ -614,6 +618,13 @@ public class JniFrontend { frontend_.waitForCatalog(); } + FeTable getCatalogTable(byte[] tableNameParam) throws ImpalaException { + Preconditions.checkNotNull(frontend_); + TTableName tableName = new TTableName(); + JniUtil.deserializeThrift(protocolFactory_, tableName, tableNameParam); + return frontend_.getCatalog().getTable(tableName.db_name, tableName.table_name); + } + // Caching this saves ~50ms per call to getHadoopConfigAsHtml private static final Configuration CONF = new Configuration(); private static final Groups GROUPS = Groups.getUserToGroupsMappingService(CONF); diff --git a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java new file mode 100644 index 000000000..a2f7c5b23 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.util; + +import com.google.common.base.Preconditions; + +import org.apache.iceberg.Accessor; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.StructLike; +import org.apache.impala.catalog.FeIcebergTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterator; + +/** + * The metadata table scanner class to scan Iceberg metadata tables through the Iceberg + * API. This object is instantiated and governed by {IcebergMetadataScanNode} at the + * backend during scanning. + * + * Iceberg generally throws RuntimeExceptions, these have to be taken care of by the + * caller of {@code IcebergMetadataScanner}. + */ +public class IcebergMetadataScanner { + // FeTable object is extracted by the backend and passed when this object is created + private FeIcebergTable iceTbl_ = null; + + // Metadata table + private Table metadataTable_ = null; + + // Name of the metadata table + private String metadataTableName_; + + // Persist the file scan task iterator so we can continue after a RowBatch is full + private CloseableIterator fileScanTaskIterator_; + + // Persist the data rows iterator, so we can continue after a batch is filled + private CloseableIterator dataRowsIterator_; + + public IcebergMetadataScanner(FeIcebergTable iceTbl, String metadataTableName) { + Preconditions.checkNotNull(iceTbl); + this.iceTbl_ = (FeIcebergTable) iceTbl; + this.metadataTableName_ = metadataTableName; + } + + /** + * Iterates over the {{fileScanTaskIterator_}} to find a {FileScanTask} that has rows. + */ + public boolean FindFileScanTaskWithRows() { + while (fileScanTaskIterator_.hasNext()) { + DataTask dataTask = (DataTask)fileScanTaskIterator_.next(); + dataRowsIterator_ = dataTask.rows().iterator(); + if (dataRowsIterator_.hasNext()) return true; + } + return false; + } + + /** + * Creates the Metadata{Table} which is a predifined Iceberg {Table} object. This method + * also starts an Iceberg {TableScan} to scan the {Table}. After the scan is ready it + * initializes the iterators, so the {GetNext} call can start fetching the rows through + * the Iceberg Api. + */ + public void ScanMetadataTable() { + // Create and scan the metadata table + metadataTable_ = MetadataTableUtils.createMetadataTableInstance( + iceTbl_.getIcebergApiTable(), MetadataTableType.valueOf(metadataTableName_)); + TableScan scan = metadataTable_.newScan(); + // Init the FileScanTask iterator and DataRowsIterator + fileScanTaskIterator_ = scan.planFiles().iterator(); + FindFileScanTaskWithRows(); + } + + /** + * Returns the field {Accessor} for the specified column position. This {Accessor} is + * used to access a field in the {StructLike} object. + */ + public Accessor GetAccessor(int slotColPos) { + int fieldId = metadataTable_.schema().columns().get(slotColPos).fieldId(); + return metadataTable_.schema().accessorForField(fieldId); + } + + /** + * Returns the next available row of the scan result. The row is a {StructLike} object + * and its fields can be accessed with {Accessor}s. + */ + public StructLike GetNext() { + // Return the next row in the DataRows iterator + if (dataRowsIterator_.hasNext()) { + return dataRowsIterator_.next(); + } + // Otherwise this DataTask is empty, find a FileScanTask that has a non-empty DataTask + if(FindFileScanTaskWithRows()) { + return dataRowsIterator_.next(); + } + return null; + } +} diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index 387618008..a15e5bd28 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -3758,6 +3758,23 @@ INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(3, 'parquet', 2.5, fa ---- DATASET functional ---- BASE_TABLE_NAME +iceberg_query_metadata +---- CREATE +CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + i int +) +STORED BY ICEBERG +LOCATION '/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata' +TBLPROPERTIES('format-version'='2'); +---- DEPENDENT_LOAD +INSERT INTO {db_name}{db_suffix}.{table_name} VALUES (1); +INSERT INTO {db_name}{db_suffix}.{table_name} VALUES (2); +INSERT INTO {db_name}{db_suffix}.{table_name} VALUES (3); +DELETE FROM {db_name}{db_suffix}.{table_name} WHERE i = 2; +==== +---- DATASET +functional +---- BASE_TABLE_NAME iceberg_lineitem_multiblock ---- CREATE CREATE EXTERNAL TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv index 2418404fc..972715384 100644 --- a/testdata/datasets/functional/schema_constraints.csv +++ b/testdata/datasets/functional/schema_constraints.csv @@ -96,6 +96,7 @@ table_name:iceberg_v2_partitioned_position_deletes_orc, constraint:restrict_to, table_name:iceberg_multiple_storage_locations, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_avro_format, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_mixed_file_format, constraint:restrict_to, table_format:parquet/none/none +table_name:iceberg_test_metadata, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none table_name:iceberg_lineitem_sixblocks, constraint:restrict_to, table_format:parquet/none/none diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test index f976cb75e..a605c94b8 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test @@ -1,102 +1,116 @@ -explain SELECT * FROM functional_parquet.iceberg_alltypes_part_orc.history +SELECT * FROM functional_parquet.iceberg_alltypes_part_orc.history ---- PLAN PLAN-ROOT SINK | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY] row-size=33B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -01:EXCHANGE [UNPARTITIONED] -| -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY] row-size=33B cardinality=unavailable ==== -explain select * +select * from functional_parquet.iceberg_alltypes_part_orc.history q join functional_parquet.iceberg_alltypes_part_orc.history z on z.snapshot_id = q.snapshot_id ---- PLAN PLAN-ROOT SINK | -02:NESTED LOOP JOIN [CROSS JOIN] +02:HASH JOIN [INNER JOIN] +| hash predicates: q.snapshot_id = z.snapshot_id | row-size=66B cardinality=unavailable | -|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] | row-size=33B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=33B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -04:EXCHANGE [UNPARTITIONED] -| -02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: q.snapshot_id = z.snapshot_id | row-size=66B cardinality=unavailable | -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [UNPARTITIONED] | | -| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] | row-size=33B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=33B cardinality=unavailable ==== -explain select q.snapshot_id, z.made_current_at as test1, z.made_current_at as test2 +select q.snapshot_id, z.made_current_at as test1, z.made_current_at as test2 from functional_parquet.iceberg_alltypes_part_orc.history q join functional_parquet.iceberg_alltypes_part_orc.history z on z.snapshot_id = q.snapshot_id ---- PLAN PLAN-ROOT SINK | -02:NESTED LOOP JOIN [CROSS JOIN] -| row-size=24B cardinality=unavailable +02:HASH JOIN [INNER JOIN] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable | -|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] -| row-size=16B cardinality=unavailable +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=8B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | -04:EXCHANGE [UNPARTITIONED] +02:HASH JOIN [INNER JOIN, BROADCAST] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable | -02:NESTED LOOP JOIN [CROSS JOIN, BROADCAST] -| row-size=24B cardinality=unavailable -| -|--03:EXCHANGE [BROADCAST] +|--03:EXCHANGE [UNPARTITIONED] | | -| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history z] -| row-size=16B cardinality=unavailable +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.history q] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] row-size=8B cardinality=unavailable ==== -explain select * from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries +select q.snapshot_id, z.made_current_at as test1, z.made_current_at as test2 +from functional_parquet.iceberg_alltypes_part_orc.history q + join /* +SHUFFLE */ functional_parquet.iceberg_alltypes_part_orc.history z + on z.snapshot_id = q.snapshot_id ---- PLAN PLAN-ROOT SINK | -01:SUBPLAN -| row-size=98B cardinality=unavailable +02:HASH JOIN [INNER JOIN] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable | -|--04:NESTED LOOP JOIN [CROSS JOIN] -| | row-size=98B cardinality=10 -| | -| |--02:SINGULAR ROW SRC -| | row-size=72B cardinality=1 -| | -| 03:UNNEST [a.partition_summaries] -| row-size=0B cardinality=10 +|--01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.manifests a] - row-size=72B cardinality=unavailable +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] + row-size=8B cardinality=unavailable ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | 05:EXCHANGE [UNPARTITIONED] | +02:HASH JOIN [INNER JOIN, PARTITIONED] +| hash predicates: q.snapshot_id = z.snapshot_id +| row-size=32B cardinality=unavailable +| +|--04:EXCHANGE [HASH(z.snapshot_id)] +| | +| 01:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY z] +| row-size=24B cardinality=unavailable +| +03:EXCHANGE [HASH(q.snapshot_id)] +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q] + row-size=8B cardinality=unavailable +==== +select * from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries +---- PLAN +PLAN-ROOT SINK +| 01:SUBPLAN | row-size=98B cardinality=unavailable | @@ -109,6 +123,23 @@ PLAN-ROOT SINK | 03:UNNEST [a.partition_summaries] | row-size=0B cardinality=10 | -00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.manifests a] +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a] + row-size=72B cardinality=unavailable +---- DISTRIBUTEDPLAN +PLAN-ROOT SINK +| +01:SUBPLAN +| row-size=98B cardinality=unavailable +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | row-size=98B cardinality=10 +| | +| |--02:SINGULAR ROW SRC +| | row-size=72B cardinality=1 +| | +| 03:UNNEST [a.partition_summaries] +| row-size=0B cardinality=10 +| +00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a] row-size=72B cardinality=unavailable ==== \ No newline at end of file diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test index ee1059e41..5a9928451 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test @@ -1,132 +1,456 @@ +# The test table for these tests are created during dataload by Impala. An existing table +# could not have been rewritten manually, because avrotools removes additional schemata +# from the manifests files that Iceberg adds. Therefore, the query results are checked +# with regexp. +#### +# Test 0 : Query all the metadata tables once +#### ==== ---- QUERY -# List of all metadata tables in current version -select * from functional_parquet.iceberg_alltypes_part_orc.entries ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.entries' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.entries; +---- RESULTS +# Example: +# 1,8283026816932323050,3,3 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 ---- TYPES -STRING +INT,BIGINT,BIGINT,BIGINT ==== ---- QUERY -# 'Files' is a keyword and need to be escaped -select * from functional_parquet.iceberg_alltypes_part_orc.`files` ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.`files`; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/754b1471ee8d8aa2-4f2f33ef00000000_134436143_data.0.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.data_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.data_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.data_files; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/944a2355e618932f-18f086b600000000_1283312202_data.0.parq','PARQUET',0,1,351,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.delete_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.delete_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.delete_files; +---- RESULTS +# Example: +# 1,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/delete-1f43b217940cc094-fedf515600000000_248998721_data.0.parq','PARQUET',0,1,1489,'NULL',NULL +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.history ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.523000000,9046920472784493998,8491702501245661704,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true ---- TYPES -STRING +TIMESTAMP,BIGINT,BIGINT,BOOLEAN ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.snapshots ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.snapshots' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.metadata_log_entries; +---- RESULTS +# Example: +# 2023-08-16 12:18:11.061000000,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/00000-0ae98ebd-b200-4381-9d97-1f93954423a9.metadata.json',NULL,NULL,NULL +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',NULL,NULL,NULL +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,1 +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,2 +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,3 +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.metadata.json',[1-9]\d*|0,0,4 ---- TYPES -STRING +TIMESTAMP,STRING,BIGINT,INT,BIGINT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.manifests ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.manifests' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.snapshots; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.322000000,8491702501245661704,NULL,'append','hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/snap-8491702501245661704-1-88a39285-529f-41a4-bd69-6d2560fac64e.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,'append','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'append','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'append','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'overwrite','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' ---- TYPES -STRING +TIMESTAMP,BIGINT,BIGINT,STRING,STRING ==== ---- QUERY -# 'Partitions' is a keyword and need to be escaped -select * from functional_parquet.iceberg_alltypes_part_orc.`partitions` ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.partitions' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.refs; +---- RESULTS +row_regex:'main','BRANCH',[1-9]\d*|0,NULL,NULL,NULL ---- TYPES -STRING +STRING,STRING,BIGINT,BIGINT,INT,BIGINT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_data_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_data_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.manifests; +---- RESULTS +# Example: +# row_regex:0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/38e5a1bd-5b7f-4eae-9362-16a2de3c575d-m0.avro',6631,0,8283026816932323050,1,0,0,0,0,0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,0,0,0,1,0,0 ---- TYPES -STRING +INT,STRING,BIGINT,INT,BIGINT,INT,INT,INT,INT,INT,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_files ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_files' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.`partitions`; +---- RESULTS +3,3,1,1,0,0 ---- TYPES -STRING +BIGINT,INT,BIGINT,INT,BIGINT,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_manifests ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_manifests' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_data_files; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/944a2355e618932f-18f086b600000000_1283312202_data.0.parq','PARQUET',0,1,351,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -select * from functional_parquet.iceberg_alltypes_part_orc.all_entries ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_entries' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_delete_files; +---- RESULTS +# Example: +# 1,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/delete-1f43b217940cc094-fedf515600000000_248998721_data.0.parq','PARQUET',0,1,1489,'NULL',NULL +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -# Select list with column name -select snapshot_id from functional_parquet.iceberg_alltypes_part_orc.history ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_files; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/data/3d481ed88b2941f0-ea33816200000000_1109948289_data.0.parq','PARQUET',0,1,351,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'',0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq','PARQUET',0,1,[1-9]\d*|0,'NULL',NULL ---- TYPES -STRING +INT,STRING,STRING,INT,BIGINT,BIGINT,BINARY,INT ==== ---- QUERY -# Joining tables -select * -from functional_parquet.iceberg_alltypes_part_orc.history q - join functional_parquet.iceberg_alltypes_part_orc.snapshots z - on z.snapshot_id = q.snapshot_id ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_manifests; +---- RESULTS +# Example: +# 0,'hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/38e5a1bd-5b7f-4eae-9362-16a2de3c575d-m0.avro',6631,0,8283026816932323050,1,0,0,0,0,0,7858675898458780516 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,1,0,0,0,0,0,[1-9]\d*|0 +row_regex:1,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro',[1-9]\d*|0,0,[1-9]\d*|0,0,0,0,1,0,0,[1-9]\d*|0 ---- TYPES -STRING +INT,STRING,BIGINT,INT,BIGINT,INT,INT,INT,INT,INT,INT,BIGINT ==== ---- QUERY -# Inline query -select x.snapshot_id -from (select * from functional_parquet.iceberg_alltypes_part_orc.history) x; ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported. +select * from functional_parquet.iceberg_query_metadata.all_entries; +---- RESULTS +# Example: +# 1,7858675898458780516,4,4 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 +row_regex:1,[1-9]\d*|0,[1-9]\d*|0,[1-9]\d*|0 ---- TYPES -STRING +INT,BIGINT,BIGINT,BIGINT + +#### +# Test 1 : Test select list +#### ==== ---- QUERY -# Complext type -select *, a.partition_summaries.pos from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.manifests' refers to a metadata table which is currently not supported. +select snapshot_id from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +# Example: +# 7858675898458780516 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 ---- TYPES -STRING +BIGINT ==== ---- QUERY -# Using complex type 'map' column without a join -select summary from functional_parquet.iceberg_alltypes_part_orc.snapshots; ----- CATCH -NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.snapshots' refers to a metadata table which is currently not supported. +select snapshot_id, * from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +# Example: +# 7858675898458780516,2023-08-16 12:18:18.584000000,7858675898458780516,8283026816932323050,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:[1-9]\d*|0,\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true ---- TYPES -STRING +BIGINT,TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +select count(*) from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +4 +---- TYPES +BIGINT +==== +---- QUERY +select record_count + file_count from functional_parquet.iceberg_query_metadata.`partitions`; +---- RESULTS +6 +---- TYPES +BIGINT + +#### +# Test 2 : Test filtering +#### +==== +---- QUERY +# Test BIGINT +select * from functional_parquet.iceberg_query_metadata.history +where snapshot_id = $OVERWRITE_SNAPSHOT_ID; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.523000000,9046920472784493998,8491702501245661704,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,$OVERWRITE_SNAPSHOT_ID,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +# Test BOOLEAN +select * from functional_parquet.iceberg_query_metadata.history +where is_current_ancestor = true; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.523000000,9046920472784493998,8491702501245661704,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +# Test STRING +select * from functional_parquet.iceberg_query_metadata.snapshots +where operation = 'overwrite'; +---- RESULTS +# Example: +# 2023-08-16 12:18:15.322000000,8491702501245661704,NULL,'append','hdfs://localhost:20500/test-warehouse/functional_parquet.db/iceberg_test_metadata/metadata/snap-8491702501245661704-1-88a39285-529f-41a4-bd69-6d2560fac64e.avro' +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,'overwrite','$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/metadata/.*.avro' +---- TYPES +TIMESTAMP,BIGINT,BIGINT,STRING,STRING +==== +---- QUERY +# Test TIMESTAMP +select * from functional_parquet.iceberg_query_metadata.history +where made_current_at = cast("$OVERWRITE_SNAPSHOT_TS" as timestamp); +---- RESULTS +row_regex:$OVERWRITE_SNAPSHOT_TS,$OVERWRITE_SNAPSHOT_ID,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN +==== +---- QUERY +# Test conjunct slot materialization +select snapshot_id from functional_parquet.iceberg_query_metadata.snapshots +where operation = 'overwrite'; +---- RESULTS +$OVERWRITE_SNAPSHOT_ID +---- TYPES +BIGINT +==== +---- QUERY +# Test an expression rewrite: OR -> IN () +select * from functional_parquet.iceberg_query_metadata.history +where snapshot_id = $OVERWRITE_SNAPSHOT_ID or snapshot_id = 1; +---- RESULTS +row_regex:$OVERWRITE_SNAPSHOT_TS,$OVERWRITE_SNAPSHOT_ID,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN + +#### +# Test 2 : Test joins +#### +==== +---- QUERY +select a.snapshot_id, b.snapshot_id from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.history b on a.snapshot_id = b.snapshot_id; +---- RESULTS +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +---- TYPES +BIGINT,BIGINT +==== +---- QUERY +select a.snapshot_id, b.parent_id from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.history b on a.snapshot_id = b.snapshot_id; +---- RESULTS +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +row_regex:[1-9]\d*|0,[1-9]\d*|0 +---- TYPES +BIGINT,BIGINT +==== +---- QUERY +select count(b.parent_id) from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.history b on a.snapshot_id = b.snapshot_id; +---- RESULTS +3 +---- TYPES +BIGINT +==== +---- QUERY +select a.snapshot_id from functional_parquet.iceberg_query_metadata.history a +join functional_parquet.iceberg_query_metadata.snapshots b on a.snapshot_id = b.snapshot_id +where a.snapshot_id = $OVERWRITE_SNAPSHOT_ID; +---- RESULTS +$OVERWRITE_SNAPSHOT_ID +---- TYPES +BIGINT + +#### +# Test 3 : Inline query +#### +==== +---- QUERY +select a.snapshot_id +from (select * from functional_parquet.iceberg_query_metadata.history) a; +---- RESULTS +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +row_regex:[1-9]\d*|0 +---- TYPES +BIGINT + +#### +# Test 4 : Complex types +# Currently not supported, complex type slots are set to NULL (IMPALA-12205) +#### +==== +---- QUERY +select snapshot_id, summary from functional_parquet.iceberg_query_metadata.snapshots; +---- RESULTS +row_regex:[1-9]\d*|0,'NULL' +row_regex:[1-9]\d*|0,'NULL' +row_regex:[1-9]\d*|0,'NULL' +row_regex:[1-9]\d*|0,'NULL' +---- TYPES +BIGINT,STRING + +#### +# Test 5 : Multiple RowBatch results +#### +==== +---- QUERY +set BATCH_SIZE=1; +select * from functional_parquet.iceberg_query_metadata.history; +---- RESULTS +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,NULL,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +row_regex:\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}(\.\d{9})?,[1-9]\d*|0,[1-9]\d*|0,true +---- TYPES +TIMESTAMP,BIGINT,BIGINT,BOOLEAN + + +#### +# Test 6 : Timetravel +# Timetravel is not supported currently, related Jira IMPALA-11991. +#### +==== +---- QUERY +select * from functional_parquet.iceberg_query_metadata.snapshots FOR SYSTEM_VERSION AS OF $OVERWRITE_SNAPSHOT_ID; +---- CATCH +AnalysisException: FOR SYSTEM_VERSION AS OF clause is only supported for Iceberg tables. functional_parquet.iceberg_query_metadata.SNAPSHOTS is not an Iceberg table. +==== + +#### +# Test 7 : Use-cases +#### +==== +---- QUERY +# All reachable manifest files size +select sum(length) from functional_parquet.iceberg_query_metadata.all_manifests; +---- RESULTS +row_regex:[1-9]\d*|0 +---- TYPES +BIGINT +==== +---- QUERY +# How many manifests? +SELECT count(*) FROM functional_parquet.iceberg_query_metadata.manifests; +---- RESULTS +4 +---- TYPES +BIGINT +==== +---- QUERY +# Join metadata table with table +SELECT i, INPUT__FILE__NAME, file_size_in_bytes from functional_parquet.iceberg_query_metadata tbl +JOIN functional_parquet.iceberg_query_metadata.all_files mtbl on tbl.input__file__name = mtbl.file_path; +---- RESULTS +row_regex:[1-9]\d*|0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',[1-9]\d*|0 +row_regex:[1-9]\d*|0,'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',[1-9]\d*|0 +---- TYPES +INT,STRING,BIGINT + +#### +# Test 8 : Invalid operations +# In most cases the parser catches the table reference. +#### +==== +---- QUERY +describe functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +AnalysisException: Could not resolve path: 'functional_parquet.iceberg_query_metadata.snapshots' +==== +---- QUERY +show create table functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +insert into table functional_parquet.iceberg_query_metadata.snapshots values (1); +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +refresh functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +invalidate metadata functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +drop table functional_parquet.iceberg_query_metadata.snapshots; +---- CATCH +ParseException: Syntax error in line 1 +==== +---- QUERY +alter table functional_parquet.iceberg_query_metadata.snapshots add columns (col int); +---- CATCH +ParseException: Syntax error in line 1 ==== \ No newline at end of file diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py index dce1716c0..bdb14a0e0 100644 --- a/tests/authorization/test_ranger.py +++ b/tests/authorization/test_ranger.py @@ -2032,6 +2032,44 @@ class TestRanger(CustomClusterTestSuite): admin_client.execute("drop database if exists {0} cascade".format(unique_database), user=ADMIN) + @pytest.mark.execute_serially + @CustomClusterTestSuite.with_args( + impalad_args=IMPALAD_ARGS, catalogd_args=CATALOGD_ARGS) + def test_iceberg_metadata_table_privileges(self, unique_name): + user = getuser() + admin_client = self.create_impala_client() + non_admin_client = self.create_impala_client() + short_table_name = "ice_1" + unique_database = unique_name + "_db" + tbl_name = unique_database + "." + short_table_name + + try: + admin_client.execute("drop database if exists {0} cascade" + .format(unique_database), user=ADMIN) + admin_client.execute("create database {0}".format(unique_database), user=ADMIN) + admin_client.execute("create table {0} (a int) stored as iceberg" + .format(tbl_name), user=ADMIN) + + # At this point, non-admin user without select privileges cannot query the metadata + # tables + result = self.execute_query_expect_failure(non_admin_client, + "select * from {0}.history".format(tbl_name), user=user) + assert "User '{0}' does not have privileges to execute 'SELECT' on: {1}".format( + user, unique_database) in str(result) + + # Grant 'user' select privilege on the table + admin_client.execute("grant select on table {0} to user {1}".format(tbl_name, user), + user=ADMIN) + result = non_admin_client.execute("select * from {0}.history".format(tbl_name), + user=user) + assert result.success is True + + finally: + admin_client.execute("revoke select on table {0} from user {1}" + .format(tbl_name, user), user=ADMIN) + admin_client.execute("drop database if exists {0} cascade".format(unique_database), + user=ADMIN) + @pytest.mark.execute_serially @SkipIf.is_test_jdk @SkipIfFS.hive diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index d6317d268..c47772bf4 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -1213,8 +1213,17 @@ class TestIcebergV2Table(IcebergTestSuite): use_db="functional_parquet") def test_metadata_tables(self, vector): - self.run_test_case('QueryTest/iceberg-metadata-tables', vector, - use_db="functional_parquet") + with self.create_impala_client() as impalad_client: + overwrite_snapshot_id = impalad_client.execute("select snapshot_id from " + "functional_parquet.iceberg_query_metadata.snapshots " + "where operation = 'overwrite';") + overwrite_snapshot_ts = impalad_client.execute("select committed_at from " + "functional_parquet.iceberg_query_metadata.snapshots " + "where operation = 'overwrite';") + self.run_test_case('QueryTest/iceberg-metadata-tables', vector, + use_db="functional_parquet", + test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]), + '$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])}) def test_delete(self, vector, unique_database): self.run_test_case('QueryTest/iceberg-delete', vector,