IMPALA-14092 Part2: Support querying of paimon data table via JNI

This patch mainly implement the querying of paimon data table
through JNI based scanner.

Features implemented:
- support column pruning.
The partition pruning and predicate push down will be submitted
as the third part of the patch.

We implemented this by treating the paimon table as normal
unpartitioned table. When querying paimon table:
- PaimonScanNode will decide paimon splits need to be scanned,
  and then transfer splits to BE do the jni-based scan operation.

- We also collect the required columns that need to be scanned,
  and pass the columns to Scanner for column pruning. This is
  implemented by passing the field ids of the columns to BE,
  instead of column position to support schema evolution.

- In the original implementation, PaimonJniScanner will directly
  pass paimon row object to BE, and call corresponding paimon row
  field accessor, which is a java method to convert row fields to
  impala row batch tuples. We find it is slow due to overhead of
  JVM method calling.
  To minimize the overhead, we refashioned the implementation,
  the PaimonJniScanner will convert the paimon row batches to
  arrow recordbatch, which stores data in offheap region of
  impala JVM. And PaimonJniScanner will pass the arrow offheap
  record batch memory pointer to the BE backend.
  BE PaimonJniScanNode will directly read data from JVM offheap
  region, and convert the arrow record batch to impala row batch.

  The benchmark shows the later implementation is 2.x better
  than the original implementation.

  The lifecycle of arrow row batch is mainly like this:
  the arrow row batch is generated in FE,and passed to BE.
  After the record batch is imported to BE successfully,
  BE will be in charge of freeing the row batch.
  There are two free paths: the normal path, and the
  exception path. For the normal path, when the arrow batch
  is totally consumed by BE, BE will call jni to fetch the next arrow
  batch. For this case, the arrow batch is freed automatically.
  For the exceptional path, it happends when query  is cancelled, or memory
  failed to allocate. For these corner cases, arrow batch is freed in the
  method close if it is not totally consumed by BE.

Current supported impala data types for query includes:
- BOOLEAN
- TINYINT
- SMALLINT
- INTEGER
- BIGINT
- FLOAT
- DOUBLE
- STRING
- DECIMAL(P,S)
- TIMESTAMP
- CHAR(N)
- VARCHAR(N)
- BINARY
- DATE

TODO:
    - Patches pending submission:
        - Support tpcds/tpch data-loading
          for paimon data table.
        - Virtual Column query support for querying
          paimon data table.
        - Query support with time travel.
        - Query support for paimon meta tables.
    - WIP:
        - Snapshot incremental read.
        - Complex type query support.
        - Native paimon table scanner, instead of
          jni based.

Testing:
    - Create tests table in functional_schema_template.sql
    - Add TestPaimonScannerWithLimit in test_scanners.py
    - Add test_paimon_query in test_paimon.py.
    - Already passed the tpcds/tpch test for paimon table, due to the
      testing table data is currently generated by spark, and it is
      not supported by impala now, we have to do this since hive
      doesn't support generating paimon table for dynamic-partitioned
      tables. we plan to submit a separate patch for tpcds/tpch data
      loading and associated tpcds/tpch query tests.
    - JVM Offheap memory leak tests, have run looped tpch tests for
      1 day, no obvious offheap memory increase is observed,
      offheap memory usage is within 10M.

Change-Id: Ie679a89a8cc21d52b583422336b9f747bdf37384
Reviewed-on: http://gerrit.cloudera.org:8080/23613
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Zoltan Borok-Nagy <boroknagyz@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
This commit is contained in:
jichen0919
2025-10-31 12:32:48 +08:00
committed by Riza Suminto
parent f8d21f34bc
commit 7e29ac23da
130 changed files with 3604 additions and 52 deletions

View File

@@ -528,6 +528,7 @@ set (IMPALA_LIBS
ExecSequence
ExecText
ExecIcebergMetadata
ExecPaimon
Exprs
ExprsIr
GlobalFlags
@@ -681,7 +682,8 @@ set (IMPALA_DEPENDENCIES
java_jvm
kudu_client
cctz
curl)
curl
arrow)
# When building with Clang, linking fails because it is trying to
# use a symbol in kudu_client, but that symbol is discarded. To

View File

@@ -25,6 +25,7 @@ add_subdirectory(rcfile)
add_subdirectory(sequence)
add_subdirectory(text)
add_subdirectory(iceberg-metadata)
add_subdirectory(paimon)
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")

View File

@@ -44,6 +44,7 @@
#include "exec/kudu/kudu-scan-node.h"
#include "exec/kudu/kudu-util.h"
#include "exec/nested-loop-join-node.h"
#include "exec/paimon/paimon-scan-plan-node.h"
#include "exec/partial-sort-node.h"
#include "exec/partitioned-hash-join-node.h"
#include "exec/select-node.h"
@@ -239,6 +240,9 @@ Status PlanNode::CreatePlanNode(
case TPlanNodeType::ICEBERG_MERGE_NODE:
*node = pool->Add(new IcebergMergePlanNode());
break;
case TPlanNodeType::PAIMON_SCAN_NODE:
*node = pool->Add(new PaimonScanPlanNode());
break;
default:
map<int, const char*>::const_iterator i =
_TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
@@ -432,6 +436,7 @@ void ExecNode::CollectScanNodes(vector<ExecNode*>* nodes) {
CollectNodes(TPlanNodeType::HDFS_SCAN_NODE, nodes);
CollectNodes(TPlanNodeType::HBASE_SCAN_NODE, nodes);
CollectNodes(TPlanNodeType::KUDU_SCAN_NODE, nodes);
CollectNodes(TPlanNodeType::PAIMON_SCAN_NODE, nodes);
}
Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* state) {

View File

@@ -0,0 +1,108 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# where to put generated libraries
set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/paimon")
# where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec/paimon")
add_library(ExecPaimon
paimon-jni-row-reader.cc
paimon-jni-scan-node.cc
paimon-jni-scanner.cc
paimon-scan-plan-node.cc
)
# Below is a work-around to fix clang compile issue. For the normal build option,
# the compile will be successful. The issue will occur if CMAKE_BUILD_TYPE has
# following types: ADDRESS_SANITIZER, TIDY, UBSAN,UBSAN_FULL, TSAN, TSAN_FULL.
# For the normal build option, will invoke g++, however for the above build type,
# build script will use clang++ instead of g++ to compile, in this case, following
# error will occur:
# The detailed errors are:
# error: cannot cast private base class 'std::__detail::__variant::_Variant_storage
# <false, arrow::FieldPath, std::__cxx11::basic_string<char>,
# std::vector<arrow::FieldRef, std::allocator<arrow::FieldRef> > >' to
# 'std::variant<arrow::FieldPath, std::__cxx11::basic_string<char>,
# std::vector<arrow::FieldRef, std::allocator<arrow::FieldRef> > >'
# return static_cast<variant<_Types...>&>(__rhs);
#
#
# looks like it hit bug: https://bugs.llvm.org/show_bug.cgi?id=31852
# and the following patch is already applied:
# https://gcc.gnu.org/cgit/gcc/commit/?id=aafaa3254ec6fc3d5e3a15a40185950d3af04432
# in the libc++ header of our toolchain.
# but looks like the patch above is only fixes the method get
# it doesn't fix the method static_cast.
# here we use the same work-ground for method static_cast.
# NOTE: the fix below only applies to current version of toolchain.
# need to adapt if toolchain is upgraded.
set(PAIMON_PATCH_GCC_HEADER_DIR
gcc-$ENV{IMPALA_GCC_VERSION}/include/c++/$ENV{IMPALA_GCC_VERSION})
set(PAIMON_PATCH_WORK_DIR
$ENV{IMPALA_TOOLCHAIN_PACKAGES_HOME}/${PAIMON_PATCH_GCC_HEADER_DIR})
set(PAIMON_PATCH_TARGET_FILE
${PAIMON_PATCH_WORK_DIR}/variant)
# change this if upgraded toolchain still haven't fix the issue.
set(PAIMON_PATCH_TARGET_FILE_CHECKSUM_ORIG
4daf8153a09ee07bab2ac339e21b9725e17a40854a42284c85b3d2ba3c0862e3)
set(PAIMON_PATCH_SENTINEL_FILE
${PAIMON_PATCH_WORK_DIR}/variant_patched)
file(SHA256 ${PAIMON_PATCH_TARGET_FILE}
PAIMON_PATCH_TARGET_FILE_CHECKSUM_CURR)
message("Current Hash Is: "
${PAIMON_PATCH_TARGET_FILE_CHECKSUM_CURR})
message("Original Hash Is: "
${PAIMON_PATCH_TARGET_FILE_CHECKSUM_ORIG})
if(PAIMON_PATCH_TARGET_FILE_CHECKSUM_ORIG STREQUAL PAIMON_PATCH_TARGET_FILE_CHECKSUM_CURR
)
message(STATUS "apply variant patch to fix clang++ static_cast issue.")
# To fix the compile error, the following tiny patch should be applied to
# toolchain-packages-gcc10.4.0/gcc-10.4.0/include/c++/10.4.0/variant
file(WRITE /tmp/variant.patch
"1296a1297,1299
> #if defined(__clang__) && __clang_major__ <= 7
> : public __detail::__variant::_Variant_base<_Types...>,
> #else
1297a1301
> #endif
")
add_custom_command(
OUTPUT ${PAIMON_PATCH_SENTINEL_FILE}
COMMAND cd ${PAIMON_PATCH_WORK_DIR} &&
patch -b -i /tmp/variant.patch ${PAIMON_PATCH_TARGET_FILE} &&
touch ${PAIMON_PATCH_SENTINEL_FILE}
DEPENDS ${PAIMON_PATCH_TARGET_FILE}
COMMENT "Patching variant file in GCC include directory to fix static_cast issue."
VERBATIM
)
add_custom_target(patch_variant
DEPENDS ${PAIMON_PATCH_SENTINEL_FILE}
COMMENT "Variant patch applied."
)
add_dependencies(ExecPaimon patch_variant gen-deps)
else()
message(STATUS "skip apply patch since hash is changed.")
add_dependencies(ExecPaimon gen-deps)
endif()

View File

@@ -0,0 +1,315 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/paimon/paimon-jni-row-reader.h"
#include <jni.h>
#include <cstdint>
#include <memory>
#include <string_view>
#include <type_traits>
#include <arrow/array/array_base.h>
#include <arrow/array/array_binary.h>
#include <arrow/type_fwd.h>
#include <glog/logging.h>
#include <arrow/api.h>
#include <arrow/c/bridge.h>
#include "common/global-types.h"
#include "exec/exec-node.inline.h"
#include "exec/parquet/parquet-common.h"
#include "exec/read-write-util.h"
#include "gutil/walltime.h"
#include "runtime/collection-value-builder.h"
#include "runtime/decimal-value.h"
#include "runtime/runtime-state.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "runtime/tuple-row.h"
#include "util/jni-util.h"
namespace impala {
PaimonJniRowReader::PaimonJniRowReader() {}
Status PaimonJniRowReader::MaterializeTuple(const arrow::RecordBatch& recordBatch,
const int row_index, const TupleDescriptor* tuple_desc, Tuple* tuple,
MemPool* tuple_data_pool, RuntimeState* state) {
DCHECK(tuple != nullptr);
DCHECK(tuple_data_pool != nullptr);
DCHECK(tuple_desc != nullptr);
int col = 0;
DCHECK(recordBatch.num_columns() == tuple_desc->slots().size());
for (const SlotDescriptor* slot_desc : tuple_desc->slots()) {
std::shared_ptr<arrow::Array> arr = recordBatch.column(col);
DCHECK(arr != nullptr);
RETURN_IF_ERROR(
WriteSlot(arr.get(), row_index, slot_desc, tuple, tuple_data_pool, state));
col++;
}
return Status::OK();
}
template <typename T, typename AT>
Status PaimonJniRowReader::WriteSlot(const AT* arrow_array, int row_idx, void* slot) {
T value = arrow_array->Value(row_idx);
*reinterpret_cast<T*>(slot) = value;
return Status::OK();
}
template <typename T, typename AT>
Status PaimonJniRowReader::CastAndWriteSlot(
const arrow::Array* arrow_array, const int row_idx, void* slot) {
auto derived_array = static_cast<const AT*>(arrow_array);
return WriteSlot<T, AT>(derived_array, row_idx, slot);
}
Status PaimonJniRowReader::WriteSlot(const arrow::Array* array, int row_index,
const SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool,
RuntimeState* state) {
if (array->IsNull(row_index)) {
tuple->SetNull(slot_desc->null_indicator_offset());
return Status::OK();
}
void* slot = tuple->GetSlot(slot_desc->tuple_offset());
const ColumnType& type = slot_desc->type();
switch (type.type) {
case TYPE_CHAR: {
RETURN_IF_ERROR(WriteVarCharOrCharSlot</* IS_CHAR */ true>(
array, row_index, slot_desc->type().len, slot, tuple_data_pool));
break;
}
case TYPE_STRING:
case TYPE_VARCHAR: {
if (type.IsBinaryType()) { // byte[]
RETURN_IF_ERROR(WriteStringOrBinarySlot</* IS_BINARY */ true>(
array, row_index, slot, tuple_data_pool));
} else {
RETURN_IF_ERROR(WriteStringOrBinarySlot</* IS_BINARY */ false>(
array, row_index, slot, tuple_data_pool));
}
break;
}
case TYPE_BOOLEAN: {
RETURN_IF_ERROR(
(CastAndWriteSlot<bool, arrow::BooleanArray>(array, row_index, slot)));
break;
}
case TYPE_DATE: {
RETURN_IF_ERROR(WriteDateSlot(array, row_index, slot));
break;
}
case TYPE_TINYINT: {
RETURN_IF_ERROR(
(CastAndWriteSlot<int8_t, arrow::Int8Array>(array, row_index, slot)));
break;
}
case TYPE_SMALLINT: {
RETURN_IF_ERROR(
(CastAndWriteSlot<int16_t, arrow::Int16Array>(array, row_index, slot)));
break;
}
case TYPE_INT: {
RETURN_IF_ERROR(
(CastAndWriteSlot<int32_t, arrow::Int32Array>(array, row_index, slot)));
break;
}
case TYPE_BIGINT: {
RETURN_IF_ERROR(
(CastAndWriteSlot<int64_t, arrow::Int64Array>(array, row_index, slot)));
break;
}
case TYPE_FLOAT: {
RETURN_IF_ERROR(
(CastAndWriteSlot<float, arrow::FloatArray>(array, row_index, slot)));
break;
}
case TYPE_DOUBLE: {
RETURN_IF_ERROR(
(CastAndWriteSlot<double, arrow::DoubleArray>(array, row_index, slot)));
break;
}
case TYPE_DECIMAL: {
RETURN_IF_ERROR(WriteDecimalSlot(array, row_index, type, slot));
break;
}
case TYPE_TIMESTAMP: {
RETURN_IF_ERROR(
WriteTimeStampSlot(array, state->local_time_zone(), row_index, slot));
break;
}
case TYPE_STRUCT: {
// TODO: implement struct type support later
tuple->SetNull(slot_desc->null_indicator_offset());
break;
}
case TYPE_ARRAY: {
// TODO: implement array type support later
tuple->SetNull(slot_desc->null_indicator_offset());
break;
}
case TYPE_MAP: {
// TODO: implement map type support later
tuple->SetNull(slot_desc->null_indicator_offset());
break;
}
default:
DCHECK(false) << "Unsupported column type: " << slot_desc->type().type;
tuple->SetNull(slot_desc->null_indicator_offset());
}
return Status::OK();
}
Status PaimonJniRowReader::WriteDateSlot(
const arrow::Array* array, const int row_idx, void* slot) {
const arrow::Date32Array* date_array = static_cast<const arrow::Date32Array*>(array);
int32_t days_since_epoch = date_array->Value(row_idx);
// This will set the value to DateValue::INVALID_DAYS_SINCE_EPOCH if it is out of
// range.
DateValue result(days_since_epoch);
*reinterpret_cast<int32_t*>(slot) = result.Value();
return Status::OK();
}
// Sets the decimal value in the slot. Inline method to avoid nested switch statements.
static const std::string ERROR_INVALID_DECIMAL = "Invalid Decimal Format";
inline Status SetDecimalVal(
const ColumnType& type, const uint8_t* buffer, int len, void* slot) {
switch (type.GetByteSize()) {
case 4: {
Decimal4Value* val = reinterpret_cast<Decimal4Value*>(slot);
if (UNLIKELY(
(ParquetPlainEncoder::Decode<Decimal4Value,
parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len, val))
< 0)) {
return Status(ERROR_INVALID_DECIMAL);
}
break;
}
case 8: {
Decimal8Value* val = reinterpret_cast<Decimal8Value*>(slot);
if (UNLIKELY(
(ParquetPlainEncoder::Decode<Decimal8Value,
parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len, val))
< 0)) {
return Status(ERROR_INVALID_DECIMAL);
}
break;
}
case 16: {
Decimal16Value* val = reinterpret_cast<Decimal16Value*>(slot);
if (UNLIKELY(
(ParquetPlainEncoder::Decode<Decimal16Value,
parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer + len, len, val))
< 0)) {
return Status(ERROR_INVALID_DECIMAL);
}
break;
}
default:
DCHECK(false);
}
return Status::OK();
}
Status PaimonJniRowReader::WriteDecimalSlot(
const arrow::Array* array, const int row_idx, const ColumnType& type, void* slot) {
const arrow::BinaryArray* binary_array = static_cast<const arrow::BinaryArray*>(array);
int byte_length = 0;
const uint8_t* data = binary_array->GetValue(row_idx, &byte_length);
DCHECK(byte_length > 0 && byte_length <= 16);
return SetDecimalVal(type, data, byte_length, slot);
}
Status PaimonJniRowReader::WriteTimeStampSlot(
const arrow::Array* array, const Timezone* timezone, const int row_idx, void* slot) {
const arrow::TimestampArray* date_array = (const arrow::TimestampArray*)array;
int64_t value = date_array->Value(row_idx);
const auto& type = static_cast<const arrow::TimestampType&>(*date_array->type());
const std::string& tz_name = type.timezone();
const Timezone* tz = tz_name.empty() ? UTCPTR : timezone;
switch (type.unit()) {
case ::arrow::TimeUnit::NANO:
*reinterpret_cast<TimestampValue*>(slot) = TimestampValue::FromUnixTimeNanos(
value / NANOS_PER_SEC, value % NANOS_PER_SEC, tz);
break;
case ::arrow::TimeUnit::MICRO:
*reinterpret_cast<TimestampValue*>(slot) =
TimestampValue::FromUnixTimeMicros(value, tz);
break;
case ::arrow::TimeUnit::MILLI:
*reinterpret_cast<TimestampValue*>(slot) =
TimestampValue::FromUnixTimeMicros(value * 1000L, tz);
break;
case ::arrow::TimeUnit::SECOND:
*reinterpret_cast<TimestampValue*>(slot) = TimestampValue::FromUnixTime(value, tz);
break;
}
return Status::OK();
}
template <bool IS_CHAR>
Status PaimonJniRowReader::WriteVarCharOrCharSlot(const arrow::Array* array,
const int row_idx, int dst_len, void* slot, MemPool* tuple_data_pool) {
const arrow::StringArray* nchar_array = static_cast<const arrow::StringArray*>(array);
std::string_view v = nchar_array->Value(row_idx);
int src_len = v.size();
int unpadded_len = std::min<int>(dst_len, src_len);
// Allocate memory and copy the bytes from the JVM to the RowBatch.
char* dst_char = reinterpret_cast<char*>(slot);
memcpy(dst_char, v.data(), unpadded_len);
StringValue::PadWithSpaces(dst_char, dst_len, unpadded_len);
return Status::OK();
}
/// Obtain bytes from arrow string/binary batch first, Then the data has to be copied
/// to the tuple_data_pool, because the Fe PaimonJniScanner releases the JVM offheap
/// memory later.
template <bool IS_BINARY>
Status PaimonJniRowReader::WriteStringOrBinarySlot(
const arrow::Array* array, const int row_idx, void* slot, MemPool* tuple_data_pool) {
std::string_view v;
uint32_t jbuffer_size = 0;
if constexpr (IS_BINARY) {
const arrow::BinaryArray* binary_array =
static_cast<const arrow::BinaryArray*>(array);
v = binary_array->Value(row_idx);
} else {
const arrow::StringArray* string_array =
static_cast<const arrow::StringArray*>(array);
v = string_array->Value(row_idx);
}
jbuffer_size = v.size();
// Allocate memory and copy the bytes from the JVM to the RowBatch.
char* buffer =
reinterpret_cast<char*>(tuple_data_pool->TryAllocateUnaligned(jbuffer_size));
if (UNLIKELY(buffer == nullptr)) {
string details = strings::Substitute("Failed to allocate $0 bytes for $1.",
jbuffer_size, IS_BINARY ? "binary" : "string");
return tuple_data_pool->mem_tracker()->MemLimitExceeded(
nullptr, details, jbuffer_size);
}
memcpy(buffer, v.data(), jbuffer_size);
reinterpret_cast<StringValue*>(slot)->Assign(buffer, jbuffer_size);
return Status::OK();
}
} // namespace impala

View File

@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/global-types.h"
#include "common/status.h"
#include "runtime/types.h"
#include <arrow/record_batch.h>
namespace impala {
class MemPool;
class RuntimeState;
class ScanNode;
class Status;
class SlotDescriptor;
class Tuple;
class TupleDescriptor;
/// Row reader for Paimon table scans, it translates a row of arrow RecordBatch to
/// Impala row batch tuples.
class PaimonJniRowReader {
public:
PaimonJniRowReader();
/// Materialize the Arrow batch into Impala rows.
Status MaterializeTuple(const arrow::RecordBatch& recordBatch, const int row_idx,
const TupleDescriptor* tuple_desc, Tuple* tuple, MemPool* tuple_data_pool,
RuntimeState* state);
private:
// Writes a row denoted by 'row_idx' from an arrow batch into the target tuple.
Status WriteSlot(const arrow::Array* array, const int row_idx,
const SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool,
RuntimeState* state) WARN_UNUSED_RESULT;
/// Template method that writes a value from 'arrow_array' to 'slot'.
/// 'T' is the type of the slot,
/// 'AT' is the proper subtype of the arrow::Array.
template <typename T, typename AT>
Status WriteSlot(const AT* arrow_array, int row_idx, void* slot) WARN_UNUSED_RESULT;
template <typename T, typename AT>
Status CastAndWriteSlot(
const arrow::Array* arrow_array, const int row_idx, void* slot) WARN_UNUSED_RESULT;
Status WriteDateSlot(
const arrow::Array* array, const int row_idx, void* slot) WARN_UNUSED_RESULT;
Status WriteDecimalSlot(const arrow::Array* array, const int row_idx,
const ColumnType& type, void* slot) WARN_UNUSED_RESULT;
/// Paimon TimeStamp is parsed into TimestampValue.
Status WriteTimeStampSlot(const arrow::Array* array, const Timezone* timezone,
const int row_idx, void* slot) WARN_UNUSED_RESULT;
template <bool IS_BINARY>
Status WriteStringOrBinarySlot(const arrow::Array* array, const int row_idx, void* slot,
MemPool* tuple_data_pool) WARN_UNUSED_RESULT;
template <bool IS_CHAR>
Status WriteVarCharOrCharSlot(const arrow::Array* array, const int row_idx, int max_len,
void* slot, MemPool* tuple_data_pool) WARN_UNUSED_RESULT;
};
} // namespace impala

View File

@@ -0,0 +1,267 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/paimon/paimon-jni-scan-node.h"
#include "common/status.h"
#include "exec/exec-node-util.h"
#include "exec/exec-node.inline.h"
#include "exec/paimon/paimon-jni-row-reader.h"
#include "exec/paimon/paimon-jni-scanner.h"
#include "exec/paimon/paimon-scan-plan-node.h"
#include "rpc/thrift-util.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
#include "util/jni-util.h"
#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
#include <jni.h>
#include <vector>
#include <arrow/c/bridge.h>
#include <glog/logging.h>
using namespace impala;
PaimonJniScanNode::PaimonJniScanNode(
ObjectPool* pool, const PaimonScanPlanNode& pnode, const DescriptorTbl& descs)
: ScanNode(pool, pnode, descs),
tuple_id_(pnode.tnode_->paimon_table_scan_node.tuple_id),
table_name_(pnode.tnode_->paimon_table_scan_node.table_name),
splits_empty_(false),
paimon_last_arrow_record_batch_consumed_bytes_(0),
arrow_record_batch_row_count_(0),
arrow_record_batch_row_index_(0) {}
Status PaimonJniScanNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanNode::Prepare(state));
DCHECK(scan_range_params_ != NULL)
<< "Must call SetScanRanges() before calling Prepare()";
scan_open_timer_ = ADD_TIMER(runtime_profile(), "ScanOpenTime");
paimon_api_scan_timer_ = ADD_TIMER(runtime_profile(), "PaimonApiScanTime");
tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_);
if (tuple_desc_ == nullptr) {
return Status(
"Failed to get tuple descriptor, tuple id: " + std::to_string(tuple_id_));
}
arrow_batch_mem_tracker_.reset(
new MemTracker(-1, "Arrow Batch", this->mem_tracker(), false));
/// Construct the jni scan param, the param will be used in PaimonJniScanner.
paimon_jni_scan_param_.__set_paimon_table_obj(
plan_node_.tnode_->paimon_table_scan_node.paimon_table_obj);
/// update projection id, will get the top-level field ids of each tuple.
std::vector<int32_t> field_ids;
RETURN_IF_ERROR(CollectProjectionFieldIds(tuple_desc_, field_ids));
paimon_jni_scan_param_.__set_projection(field_ids);
LOG(INFO) << table_name_ << " Contains " << field_ids.size() << " field ids."
<< std::endl;
paimon_jni_scan_param_.__set_mem_limit_bytes(
arrow_batch_mem_tracker_->GetLowestLimit(MemLimit::HARD));
paimon_jni_scan_param_.__set_batch_size(state->batch_size());
paimon_jni_scan_param_.__set_fragment_id(state->fragment_instance_id());
std::vector<std::string> scan_range_vector;
for (const ScanRangeParamsPB& params : *scan_range_params_) {
DCHECK(params.scan_range().has_file_metadata());
const std::string& split = params.scan_range().file_metadata();
scan_range_vector.push_back(split);
}
paimon_jni_scan_param_.__set_splits(scan_range_vector);
/// Check if splits is empty
splits_empty_ = scan_range_vector.empty();
impala::ThriftSerializer serializer(false);
/// serialize the jni scan param to binary.
RETURN_IF_ERROR(serializer.SerializeToString<TPaimonJniScanParam>(
&paimon_jni_scan_param_, &paimon_jni_scan_param_serialized_));
return Status::OK();
}
Status PaimonJniScanNode::Open(RuntimeState* state) {
SCOPED_TIMER(scan_open_timer_);
RETURN_IF_ERROR(ScanNode::Open(state));
/// Skip if splits is empty.
if (splits_empty_) {
return Status::OK();
}
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == nullptr) return Status("Failed to get/create JVM");
jni_scanner_.reset(
new PaimonJniScanner(paimon_jni_scan_param_serialized_, tuple_desc_, table_name_));
RETURN_IF_ERROR(jni_scanner_->Init(env));
paimon_row_reader_.reset(new PaimonJniRowReader());
SCOPED_TIMER(paimon_api_scan_timer_);
RETURN_IF_ERROR(jni_scanner_->ScanTable(env));
return Status::OK();
}
Status PaimonJniScanNode::CollectProjectionFieldIds(
const TupleDescriptor* tuple_desc_, vector<int32_t>& projection) {
for (const SlotDescriptor* slot_desc : tuple_desc_->slots()) {
int field_id = -1;
if (slot_desc->col_path().size() == 1) {
// Top level slots have ColumnDescriptors that store the field ids.
field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
} else {
// TODO: support the nested field later.
return Status("Paimon Scanner currently doesn't support nested type now");
}
DCHECK_NE(field_id, -1);
projection.push_back(field_id);
}
return Status::OK();
}
Status PaimonJniScanNode::GetNextBatchIfNeeded(bool* is_empty_batch) {
/// Check if we need to fetch next arrow record batch from jni. if yes,
/// fetch it.
if (paimon_arrow_record_batch_holder_ == nullptr
|| arrow_record_batch_row_index_ >= arrow_record_batch_row_count_) {
SCOPED_TIMER(paimon_api_scan_timer_);
JNIEnv* env = JniUtil::GetJNIEnv();
struct ArrowArray* array;
struct ArrowSchema* schema;
long row_count;
long offheap_consumed_bytes;
DCHECK(is_empty_batch != nullptr);
RETURN_IF_ERROR(jni_scanner_->GetNextBatchDirect(
env, &array, &schema, &row_count, &offheap_consumed_bytes));
*is_empty_batch = false;
if (row_count > 0) {
auto resultImportVectorSchemaRoot = arrow::ImportRecordBatch(array, schema);
/// since the result type is arrow::Status, need to check status manually
/// without using macro RETURN_IF_ERROR
if (!resultImportVectorSchemaRoot.ok()) {
return Status(resultImportVectorSchemaRoot.status().message());
}
paimon_arrow_record_batch_holder_ = resultImportVectorSchemaRoot.ValueUnsafe();
arrow_record_batch_row_count_ = row_count;
DCHECK_EQ(row_count, paimon_arrow_record_batch_holder_->num_rows());
arrow_record_batch_row_index_ = 0;
/// update allocated offheap memory reported from Jni.
/// need to do for each batch since we need to check the
/// memory usage hit the limit of mem tracker.
OffheapTrackFree();
RETURN_IF_ERROR(OffheapTrackAllocation(offheap_consumed_bytes));
} else {
/// No more batches to fetch.
paimon_arrow_record_batch_holder_ = nullptr;
arrow_record_batch_row_count_ = 0;
arrow_record_batch_row_index_ = 0;
OffheapTrackFree();
}
}
*is_empty_batch = arrow_record_batch_row_count_ > 0
&& arrow_record_batch_row_index_ < arrow_record_batch_row_count_;
return Status::OK();
}
Status PaimonJniScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
ScopedGetNextEventAdder ea(this, eos);
RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(QueryMaintenance(state));
/// Return eos if empty splits or reached limit.
if (splits_empty_ || ReachedLimit()) {
*eos = true;
return Status::OK();
}
/// Set eos to false initially for the batch.
*eos = false;
/// Allocate buffer for RowBatch and init the tuple
uint8_t* tuple_buffer;
int64_t tuple_buffer_size;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state, &tuple_buffer_size, &tuple_buffer));
Tuple* tuple = reinterpret_cast<Tuple*>(tuple_buffer);
tuple->Init(tuple_buffer_size);
SCOPED_TIMER(materialize_tuple_timer());
while (!ReachedLimit() && !row_batch->AtCapacity()) {
/// Break the loop if is canceled, or maintainance is needed.
if (state->is_cancelled() || !QueryMaintenance(state).ok()) {
break;
}
int row_idx = row_batch->AddRow();
TupleRow* tuple_row = row_batch->GetRow(row_idx);
tuple_row->SetTuple(0, tuple);
/// Get the next arrow batch from 'org.apache.impala.util.paimon.PaimonJniScanner'
/// if the current arrow batch is already consumed.
bool fetched;
RETURN_IF_ERROR(GetNextBatchIfNeeded(&fetched));
/// When fetched is false, there are no more arrow batches to read
if (!fetched) {
*eos = true;
break;
}
DCHECK(paimon_arrow_record_batch_holder_ != nullptr);
COUNTER_ADD(rows_read_counter(), 1);
RETURN_IF_ERROR(paimon_row_reader_->MaterializeTuple(
*paimon_arrow_record_batch_holder_, arrow_record_batch_row_index_, tuple_desc_,
tuple, row_batch->tuple_data_pool(), state));
/// Evaluate conjuncts on this tuple row
if (ExecNode::EvalConjuncts(
conjunct_evals().data(), conjunct_evals().size(), tuple_row)) {
row_batch->CommitLastRow();
tuple = reinterpret_cast<Tuple*>(
reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
IncrementNumRowsReturned(1);
} else {
/// Reset the null bits, everyhing else will be overwritten
Tuple::ClearNullBits(
tuple, tuple_desc_->null_bytes_offset(), tuple_desc_->num_null_bytes());
}
/// will process the next row of arrow RecordBatch in next iteration.
arrow_record_batch_row_index_++;
}
if (ReachedLimit()) {
*eos = true;
}
return Status::OK();
}
void PaimonJniScanNode::Close(RuntimeState* state) {
if (is_closed()) return;
/// Eagerly release arrow batch before calling
/// frontend method close, in case of
/// mem leak. leak was observed when query was
/// canceled.
if (paimon_arrow_record_batch_holder_ != nullptr) {
paimon_arrow_record_batch_holder_.reset();
arrow_record_batch_row_index_ = 0;
arrow_record_batch_row_count_ = 0;
}
/// Close jni scanner if splits is not empty.
if (!splits_empty_) {
jni_scanner_->Close(state);
}
OffheapTrackFree();
if (arrow_batch_mem_tracker_ != nullptr) {
arrow_batch_mem_tracker_->Close();
}
ScanNode::Close(state);
}

View File

@@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/global-types.h"
#include "exec/paimon/paimon-jni-row-reader.h"
#include "exec/paimon/paimon-jni-scanner.h"
#include "exec/scan-node.h"
#include "runtime/descriptors.h"
#include <jni.h>
#include <arrow/record_batch.h>
namespace impala {
class ExecNode;
class PaimonJniRowReader;
class RuntimeState;
class Status;
/// Scan node for an Paimon table.
/// Since Paimon API can be used to scan both paimon data tables and metadata tables.
/// The current jni scanner works as a generic solution to scan both data table and
/// metadata tables.
/// For scanning these paimon tables this scanner calls into the JVM and creates an
/// 'PaimonJniScanner' object that does the scanning. Once the Paimon scan is done,
/// To minimize the jni overhead, the PaimonJniScanner will first write batch of
/// InternalRows to the arrow BatchRecord into the offheap, and the offheap memory
/// pointer will pass to the native side to directly read the batch record and
/// convert the arrow format into native impala RowBatch. The benchmark shows 2.x
/// better performance than pure jni implementation.
///
/// The flow of scanning is:
/// 1. Backend: Get the splits/table obj from plan node, generate thrift
/// encoded param and passes to the JNI scanner.
/// 2. Backend: Creates an PaimonJniScanner object on the Java heap.
/// 3. Backend: Triggers a table scan on the Frontend
/// 4. Frontend: Executes the scan
/// 5. Backend: Calls GetNext that calls PaimonJniScanner's GetNext
/// 6. Frontend: PaimonJniScanner's GetNextBatchDirect will return the offheap pointer,
/// as well as consumed offheap bytes size to the arrow row batch.
/// 7. Backend: Consume the arrow RecordBatch into impala RowBatch.
///
/// Note:
/// This scan node can be executed on any executor.
class PaimonScanPlanNode;
class PaimonJniScanNode : public ScanNode {
public:
PaimonJniScanNode(
ObjectPool* pool, const PaimonScanPlanNode& pnode, const DescriptorTbl& descs);
/// Initializes counters, executes Paimon table scan and initializes accessors.
Status Prepare(RuntimeState* state) override;
/// Creates the Paimon row reader.
Status Open(RuntimeState* state) override;
/// Fills the next rowbatch with the results returned by the Paimon scan.
Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
/// Finalize and close this operator.
void Close(RuntimeState* state) override;
protected:
Status CollectProjectionFieldIds(
const TupleDescriptor* tuple_desc, vector<int32_t>& projection);
Status OffheapTrackAllocation(long bytes);
void OffheapTrackFree();
private:
/// Adapter that helps preparing the table and executes an Paimon table scan
/// on Java side. Allows the ScanNode to fetch the Arrow RecordBatch from the
/// Java Heap.
std::unique_ptr<PaimonJniScanner> jni_scanner_;
/// Helper class to transform Paimon rows to Impala tuples.
std::unique_ptr<PaimonJniRowReader> paimon_row_reader_;
/// Get the next arrow row record batch from the jni scanner.
/// returns false if no record is available anymore, true if valid row is returned.
Status GetNextBatchIfNeeded(bool* is_empty_batch);
/// The TupleId and TupleDescriptor of the tuple that this scan node will populate.
const TupleId tuple_id_;
const TupleDescriptor* tuple_desc_ = nullptr;
/// The table name.
const std::string table_name_;
// Paimon scan param.
TPaimonJniScanParam paimon_jni_scan_param_;
/// Indicate whether the splits is empty.
/// It is used to control whether the
/// jni scanner should be created. If splits is empty,
/// it will bypass the JNI operation, in other words,
/// all jni related operation will be skipped,directly
/// set eof flag to true, and return empty scan
/// result.
bool splits_empty_;
/// MemTracker for tracing arrow used JVM offheap memory.
/// Initialized in Prepare(). Owned by RuntimeState.
std::unique_ptr<MemTracker> arrow_batch_mem_tracker_;
/// last consumed offheap bytes for arrow batch
long paimon_last_arrow_record_batch_consumed_bytes_;
/// Thrift serialized paimon scan param
std::string paimon_jni_scan_param_serialized_;
/// current unconsumed arrow record batch.
std::shared_ptr<arrow::RecordBatch> paimon_arrow_record_batch_holder_;
/// current row_count of the arrow record batch.
long arrow_record_batch_row_count_;
/// current row_index of the arrow record batch.
long arrow_record_batch_row_index_;
/// Paimon scan specific counters.
RuntimeProfile::Counter* scan_open_timer_;
RuntimeProfile::Counter* paimon_api_scan_timer_;
};
inline Status PaimonJniScanNode::OffheapTrackAllocation(long consumed_bytes) {
if (consumed_bytes > 0) {
if (arrow_batch_mem_tracker_->TryConsume(consumed_bytes)) {
paimon_last_arrow_record_batch_consumed_bytes_ = consumed_bytes;
return Status::OK();
} else {
return Status::MemLimitExceeded("Arrow batch size exceed the mem limit.");
}
} else {
return Status::OK();
}
}
inline void PaimonJniScanNode::OffheapTrackFree() {
if (paimon_last_arrow_record_batch_consumed_bytes_ > 0) {
arrow_batch_mem_tracker_->Release(paimon_last_arrow_record_batch_consumed_bytes_);
paimon_last_arrow_record_batch_consumed_bytes_ = 0;
}
}
} // namespace impala

View File

@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/paimon/paimon-jni-scanner.h"
#include <jni.h>
#include "util/jni-util.h"
namespace impala {
PaimonJniScanner::PaimonJniScanner(const std::string& scan_param,
const TupleDescriptor* tuple_desc, const std::string& table_name)
: paimon_scan_param_(scan_param), tuple_desc_(tuple_desc), table_name_(table_name) {}
Status PaimonJniScanner::InitJNI() {
DCHECK(paimon_jni_scanner_cl_ == nullptr) << "InitJNI() already called!";
JNIEnv* env = JniUtil::GetJNIEnv();
if (env == nullptr) return Status("Failed to get/create JVM");
// Global class references:
RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(
env, "org/apache/impala/util/paimon/PaimonJniScanner", &paimon_jni_scanner_cl_));
// Method ids:
RETURN_IF_ERROR(JniUtil::GetMethodID(
env, paimon_jni_scanner_cl_, "<init>", "([B)V", &paimon_jni_scanner_ctor_));
RETURN_IF_ERROR(JniUtil::GetMethodID(
env, paimon_jni_scanner_cl_, "ScanTable", "()V", &paimon_jni_scanner_scan_table_));
RETURN_IF_ERROR(JniUtil::GetMethodID(env, paimon_jni_scanner_cl_, "GetNextBatch",
"([J)J", &paimon_jni_scanner_get_next_));
RETURN_IF_ERROR(JniUtil::GetMethodID(
env, paimon_jni_scanner_cl_, "close", "()V", &paimon_jni_scanner_close_));
return Status::OK();
}
Status PaimonJniScanner::Init(JNIEnv* env) {
jbyteArray jbytes_scan_param = env->NewByteArray(paimon_scan_param_.size());
RETURN_ERROR_IF_EXC(env);
env->SetByteArrayRegion(jbytes_scan_param, 0, paimon_scan_param_.size(),
reinterpret_cast<const jbyte*>(paimon_scan_param_.data()));
RETURN_ERROR_IF_EXC(env);
jobject j_jni_scanner =
env->NewObject(paimon_jni_scanner_cl_, paimon_jni_scanner_ctor_, jbytes_scan_param);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, j_jni_scanner, &j_jni_scanner_));
return Status::OK();
}
Status PaimonJniScanner::ScanTable(JNIEnv* env) {
env->CallObjectMethod(j_jni_scanner_, paimon_jni_scanner_scan_table_);
RETURN_ERROR_IF_EXC(env);
return Status::OK();
}
Status PaimonJniScanner::GetNextBatchDirect(JNIEnv* env, struct ArrowArray** array,
struct ArrowSchema** schema, long* rows, long* offheap_used) {
/// Will pass a long array to java method, and the returned are two memory address.
/// 1st is the schema memory address, the second is the memory address of arrow
/// array vector. the two memory address are in the offheap region of JVM,
/// and the offheap usage in bytes.
jlongArray address_array = env->NewLongArray(3);
RETURN_ERROR_IF_EXC(env);
jlong result =
env->CallLongMethod(j_jni_scanner_, paimon_jni_scanner_get_next_, address_array);
RETURN_ERROR_IF_EXC(env);
jlong values[3];
env->GetLongArrayRegion(address_array, 0, 3, &values[0]);
*schema = (struct ArrowSchema*)values[0];
*array = (struct ArrowArray*)values[1];
*offheap_used = values[2];
*rows = result;
env->DeleteLocalRef(address_array);
return Status::OK();
}
void PaimonJniScanner::Close(RuntimeState* state) {
JNIEnv* env = JniUtil::GetJNIEnv();
if (env != nullptr) {
if (j_jni_scanner_ != nullptr) {
/// Call close method to free resources of java PaimonJniScanner.
env->CallObjectMethod(j_jni_scanner_, paimon_jni_scanner_close_);
env->DeleteGlobalRef(j_jni_scanner_);
}
}
}
string PaimonJniScanner::DebugString() {
std::stringstream out;
out << "PaimonJniScanner: [ Paimon table name: " << table_name_ << "; ";
out << tuple_desc_->DebugString() << "]";
return out.str();
}
} // namespace impala

View File

@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/status.h"
#include "runtime/descriptors.h"
#include <jni.h>
#include <string>
#include <arrow/c/bridge.h>
namespace impala {
class RuntimeState;
/// Adapter class of the FE PaimonJniScanner, wraps the JNI calls as C++ methods.
class PaimonJniScanner {
public:
PaimonJniScanner(const std::string& scan_param, const TupleDescriptor* tuple_desc,
const std::string& table_name);
/// JNI setup. Creates global references for Java classes and finds method ids.
/// Initializes static members, should be called once per process lifecycle.
static Status InitJNI() WARN_UNUSED_RESULT;
// Initializes this object, creates the java metadata scanner object.
Status Init(JNIEnv* env) WARN_UNUSED_RESULT;
/// Executes an Paimon scan through JNI.
Status ScanTable(JNIEnv* env) WARN_UNUSED_RESULT;
/// Gets the next arrow batch from 'org.apache.impala.util.paimon.PaimonJniScanner'.
Status GetNextBatchDirect(JNIEnv* env, struct ArrowArray** array,
struct ArrowSchema** schema, long* rows, long* offheap_used) WARN_UNUSED_RESULT;
/// Removes global references.
void Close(RuntimeState* state);
private:
/// Global class references created with JniUtil.
inline static jclass paimon_jni_scanner_cl_ = nullptr;
/// Method references created with JniUtil.
inline static jmethodID paimon_jni_scanner_ctor_ = nullptr;
inline static jmethodID paimon_jni_scanner_scan_table_ = nullptr;
inline static jmethodID paimon_jni_scanner_get_next_ = nullptr;
inline static jmethodID paimon_jni_scanner_close_ = nullptr;
/// The Paimon table scan parameters.
const std::string& paimon_scan_param_;
/// Top level TupleDescriptor.
const TupleDescriptor* tuple_desc_;
/// metastore table name
const std::string& table_name_;
/// Paimon scanner Java object, it helps preparing the table and
/// executes an Paimon table scan. Allows the ScanNode to fetch the row batch from
/// the Java Off Heap.
jobject j_jni_scanner_;
std::string DebugString();
};
} // namespace impala

View File

@@ -0,0 +1,31 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "common/status.h"
#include "exec/paimon/paimon-scan-plan-node.h"
#include "exec/paimon/paimon-jni-scan-node.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
#include "util/jni-util.h"
using namespace impala;
Status PaimonScanPlanNode::CreateExecNode(RuntimeState* state, ExecNode** node) const {
ObjectPool* pool = state->obj_pool();
*node = pool->Add(new PaimonJniScanNode(pool, *this, state->desc_tbl()));
return Status::OK();
}

View File

@@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <jni.h>
#include "exec/scan-node.h"
namespace impala {
class ExecNode;
class RuntimeState;
class ScanPlanNode;
class Status;
class PaimonScanPlanNode : public ScanPlanNode {
public:
Status CreateExecNode(RuntimeState* state, ExecNode** node) const override;
~PaimonScanPlanNode() {}
};
} // namespace impala

View File

@@ -368,6 +368,16 @@ string SystemTableDescriptor::DebugString() const {
return out.str();
}
PaimonTableDescriptor::PaimonTableDescriptor(const TTableDescriptor& tdesc)
: TableDescriptor(tdesc), paimon_api_table_(tdesc.paimonTable) {}
string PaimonTableDescriptor::DebugString() const {
stringstream out;
out << "PaimonTable(" << TableDescriptor::DebugString();
out << ")";
return out.str();
}
TupleDescriptor::TupleDescriptor(const TTupleDescriptor& tdesc)
: id_(tdesc.id),
byte_size_(tdesc.byteSize),
@@ -626,6 +636,9 @@ Status DescriptorTbl::CreateTblDescriptorInternal(const TTableDescriptor& tdesc,
case TTableType::SYSTEM_TABLE:
*desc = pool->Add(new SystemTableDescriptor(tdesc));
break;
case TTableType::PAIMON_TABLE:
*desc = pool->Add(new PaimonTableDescriptor(tdesc));
break;
default:
DCHECK(false) << "invalid table type: " << tdesc.tableType;
}

View File

@@ -604,6 +604,16 @@ class SystemTableDescriptor : public TableDescriptor {
TSystemTableName::type table_name_;
};
// Descriptor for a Paimon Table
class PaimonTableDescriptor : public TableDescriptor {
public:
PaimonTableDescriptor(const TTableDescriptor& tdesc);
virtual std::string DebugString() const;
private:
impala::TPaimonTable paimon_api_table_;
};
class TupleDescriptor {
public:
int byte_size() const { return byte_size_; }

View File

@@ -70,7 +70,7 @@ static const string SCHEDULER_WARNING_KEY("Scheduler Warning");
static const vector<TPlanNodeType::type> SCAN_NODE_TYPES{TPlanNodeType::HDFS_SCAN_NODE,
TPlanNodeType::HBASE_SCAN_NODE, TPlanNodeType::DATA_SOURCE_NODE,
TPlanNodeType::KUDU_SCAN_NODE, TPlanNodeType::ICEBERG_METADATA_SCAN_NODE,
TPlanNodeType::SYSTEM_TABLE_SCAN_NODE};
TPlanNodeType::SYSTEM_TABLE_SCAN_NODE, TPlanNodeType::PAIMON_SCAN_NODE};
// Consistent scheduling requires picking up to k distinct candidates out of n nodes.
// Since each iteration can pick a node that it already picked (i.e. it is sampling with

View File

@@ -31,6 +31,7 @@
#include "exec/hbase/hbase-table-writer.h"
#include "exec/iceberg-metadata/iceberg-metadata-scanner.h"
#include "exec/iceberg-metadata/iceberg-row-reader.h"
#include "exec/paimon/paimon-jni-scanner.h"
#include "exprs/hive-udf-call.h"
#include "exprs/timezone_db.h"
#include "gen-cpp/ImpalaService.h"
@@ -68,6 +69,7 @@ int ImpaladMain(int argc, char** argv) {
ABORT_IF_ERROR(HBaseTableWriter::InitJNI());
ABORT_IF_ERROR(IcebergMetadataScanner::InitJNI());
ABORT_IF_ERROR(IcebergRowReader::InitJNI());
ABORT_IF_ERROR(PaimonJniScanner::InitJNI());
ABORT_IF_ERROR(HiveUdfCall::InitEnv());
ABORT_IF_ERROR(JniCatalogCacheUpdateIterator::InitJNI());
InitFeSupport();