IMPALA-14472: Add create/read support for ARRAY column of Kudu

Initial implementation of KUDU-1261 (array column type) recently merged
in upstream Apache Kudu repository. This patch add initial Impala
support for working with Kudu tables having array type columns.

Unlike rows, the elements of a Kudu array are stored in a different
format than Impala. Instead of per-row bit flag for NULL info, values
and NULL bits are stored in separate arrays.

The following types of queries are not supported in this patch:
- (IMPALA-14538) Queries that reference an array column as a table, e.g.
  ```sql
  SELECT item FROM kudu_array.array_int;
  ```
- (IMPALA-14539) Queries that create duplicate collection slots, e.g.
  ```sql
  SELECT array_int FROM kudu_array AS t, t.array_int AS unnested;
  ```

Testing:
- Add some FE tests in AnalyzeDDLTest and AnalyzeKuduDDLTest.
- Add EE test test_kudu.py::TestKuduArray.
  Since Impala does not support inserting complex types, including
  array, the data insertion part of the test is achieved through
  custom C++ code kudu-array-inserter.cc that insert into Kudu via
  Kudu C++ client. It would be great if we could migrate it to Python so
  that it can be moved to the same file as the test (IMPALA-14537).
- Pass core tests.

Co-authored-by: Riza Suminto

Change-Id: I9282aac821bd30668189f84b2ed8fff7047e7310
Reviewed-on: http://gerrit.cloudera.org:8080/23493
Reviewed-by: Alexey Serbin <alexey@apache.org>
Reviewed-by: Michael Smith <michael.smith@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Xuebin Su
2025-09-17 17:06:39 +08:00
committed by Riza Suminto
parent 671a7fcada
commit 6b6f7e614d
20 changed files with 784 additions and 96 deletions

View File

@@ -37,3 +37,9 @@ add_library(ExecKudu
)
add_dependencies(ExecKudu gen-deps)
# kudu-array-inserter is used in tests/query_test/test_kudu.py::TestKuduArray.
# TODO(IMPALA-14537): Implement kudu-array-inserter using Kudu's Python API.
add_executable(kudu-array-inserter kudu-array-inserter.cc)
target_link_libraries(kudu-array-inserter ${IMPALA_LINK_LIBS})

View File

@@ -0,0 +1,192 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <iostream>
#include <limits>
#include <memory>
#include <kudu/client/client.h>
#include <kudu/client/write_op.h>
#include "gutil/stl_util.h"
#include "util/kudu-status-util.h"
using kudu::Slice;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::sp::shared_ptr;
using std::cerr;
using std::endl;
namespace impala {
// Utility program to facilitate complex column insertion into Kudu table.
// This script specifically tailored to insert into Kudu table with following columns:
// (
// id TINYINT PRIMARY KEY,
// array_INT ARRAY<INT>,
// array_TIMESTAMP ARRAY<TIMESTAMP>,
// array_VARCHAR ARRAY<VARCHAR(1)>,
// array_DECIMAL ARRAY<DECIMAL(18,18)>,
// array_DOUBLE ARRAY<DOUBLE>,
// array_BINARY ARRAY<BINARY>,
// array_BOOLEAN ARRAY<BOOLEAN>
// )
//
// The destination table must be empty before this program run.
// Same as in tests/conftest.py
constexpr const char* KUDU_MASTER_DEFAULT_ADDR = "localhost:7051";
const char* KUDU_TEST_TABLE_NAME;
const vector<int32_t> INT32_ARRAY = {
std::numeric_limits<int32_t>::lowest(), -1, std::numeric_limits<int32_t>::max()};
const vector<int64_t> TIMESTAMP_ARRAY = {
-17987443200000000, // See MIN_DATE_AS_UNIX_TIME in be/src/runtime/timestamp-test.cc
-1L,
253402300799999999, // See MAX_DATE_AS_UNIX_TIME in be/src/runtime/timestamp-test.cc
};
// To test multi-byte characters.
const vector<Slice> UTF8_ARRAY = {u8"Σ", u8"π", u8"λ"};
const vector<int64_t> DECIMAL18_ARRAY = {
-999'999'999'999'999'999, // 18 digits
-1L,
999'999'999'999'999'999, // 18 digits
};
// See StringParser::StringToFloatInternal() for how the special values are generated.
const vector<double> DOUBLE_ARRAY = {
-std::numeric_limits<double>::infinity(),
-std::numeric_limits<double>::quiet_NaN(),
std::numeric_limits<double>::infinity()
};
const vector<bool> BOOL_ARRAY = {true, false, true};
// 'id' starts from 0, same as Python's range().
int id = 0;
kudu::Status KuduInsertNulls(
const shared_ptr<KuduSession>& session, const shared_ptr<KuduTable>& table) {
std::unique_ptr<KuduInsert> insert(table->NewInsert());
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetInt8("id", id));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_int"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_timestamp"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_varchar"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_decimal"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_double"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_binary"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_boolean"));
KUDU_RETURN_NOT_OK(session->Apply(insert.release()));
++id;
return kudu::Status::OK();
}
// Generates a vector whose length is the same as 'non_null' using the data in 'array'.
template <typename T>
vector<T> repeat(const vector<T>& array, const vector<bool>& non_null) {
vector<T> result;
result.reserve(non_null.size());
for (size_t i = 0UL; i < non_null.size(); ++i) {
result.push_back(array[i % array.size()]);
}
return result;
}
kudu::Status KuduInsertArrays(const shared_ptr<KuduSession>& session,
const shared_ptr<KuduTable>& table, const vector<bool>& non_null) {
std::unique_ptr<KuduInsert> insert(table->NewInsert());
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetInt8("id", id));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayInt32(
"array_int", repeat(INT32_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayUnixTimeMicros(
"array_timestamp", repeat(TIMESTAMP_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayVarchar(
"array_varchar", repeat(UTF8_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayUnscaledDecimal(
"array_decimal", repeat(DECIMAL18_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayDouble(
"array_double", repeat(DOUBLE_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayBinary(
"array_binary", repeat(UTF8_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayBool(
"array_boolean", repeat(BOOL_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(session->Apply(insert.release()));
++id;
return kudu::Status::OK();
}
kudu::Status RunKuduArrayInsert() {
shared_ptr<KuduClient> client;
// Connect to the cluster.
KUDU_RETURN_NOT_OK(KuduClientBuilder()
.add_master_server_addr(KUDU_MASTER_DEFAULT_ADDR)
.Build(&client));
shared_ptr<KuduTable> table;
KUDU_RETURN_NOT_OK(client->OpenTable(KUDU_TEST_TABLE_NAME, &table));
shared_ptr<KuduSession> session = client->NewSession();
KUDU_RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
// The array slot is NULL.
KUDU_RETURN_NOT_OK(KuduInsertNulls(session, table));
// The array is not empty and no element is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, true}));
// The array is empty.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {}));
// Array element at the start is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {false, true, true}));
// Array element at the middle is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, false, true}));
// Array element at the end is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, false}));
// The array is longer than those in the previous rows.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, true, true, true}));
kudu::Status status = session->Flush();
if (status.ok()) return status;
vector<KuduError*> errors;
ElementDeleter drop(&errors);
bool overflowed;
session->GetPendingErrors(&errors, &overflowed);
for (const KuduError* error : errors) {
cerr << "Error: " << error->status().ToString() << endl;
}
return status;
}
} // namespace impala
int main(int argc, char** argv) {
// Example usage:
// kudu-array-inserter impala::functional_kudu.kudu_array
assert(argc == 2);
impala::KUDU_TEST_TABLE_NAME = argv[1];
kudu::Status status = impala::RunKuduArrayInsert();
if (!status.ok()) {
cerr << "Error: " << status.ToString() << endl;
return 1;
}
return 0;
}

View File

@@ -33,9 +33,11 @@
#include "exprs/scalar-expr.h"
#include "exprs/slot-ref.h"
#include "gutil/strings/substitute.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/logging.h"
#include "kudu/util/slice.h"
#include "runtime/collection-value.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
@@ -52,6 +54,7 @@
#include "util/periodic-counter-updater.h"
#include "util/runtime-profile-counters.h"
using kudu::client::KuduArrayCellView;
using kudu::client::KuduClient;
using kudu::client::KuduPredicate;
using kudu::client::KuduScanBatch;
@@ -89,6 +92,23 @@ Status KuduScanner::Open() {
varchar_slots_.push_back(slot);
}
}
// Precompute the element byte size for each array slot since it is stable for the
// whole column.
kudu_array_element_byte_sizes_.assign(
scan_node_->tuple_desc()->collection_slots().size(), 0);
for (int i = 0; i < scan_node_->tuple_desc()->collection_slots().size(); ++i) {
auto slot = scan_node_->tuple_desc()->collection_slots()[i];
// Check the slot type.
DCHECK(slot->type().IsArrayType());
DCHECK_NE(slot->children_tuple_descriptor(), nullptr);
// If the children tuple descriptor contains no slots, we don't need to materialize
// the elements.
if (slot->children_tuple_descriptor()->slots().size() > 0) {
DCHECK_EQ(slot->children_tuple_descriptor()->slots().size(), 1);
SlotDescriptor* item_slot = slot->children_tuple_descriptor()->slots().front();
kudu_array_element_byte_sizes_[i] = GetKuduArrayElementSize(item_slot);
}
}
return ScalarExprEvaluator::Clone(&obj_pool_, state_, expr_perm_pool_.get(),
expr_results_pool_.get(), scan_node_->conjunct_evals(), &conjunct_evals_);
}
@@ -370,6 +390,142 @@ Status KuduScanner::HandleEmptyProjection(RowBatch* row_batch) {
return Status::OK();
}
// Kudu tuples containing TIMESTAMP columns (UNIXTIME_MICROS in Kudu, stored as an
// int64) have 8 bytes of padding following the timestamp. Because this padding is
// provided, Impala can convert these unixtime values to Impala's TimestampValue
// format in place and copy the rows to Impala row batches.
// TODO: avoid mem copies with a Kudu mem 'release' mechanism, attaching mem to the
// batch.
// TODO: consider codegen for this per-timestamp col fixup
Status KuduScanner::ConvertTimestampFromKudu(
Tuple* kudu_tuple, const SlotDescriptor* slot) {
DCHECK(slot->type().type == TYPE_TIMESTAMP);
if (slot->is_nullable() && kudu_tuple->IsNull(slot->null_indicator_offset())) {
return Status::OK();
}
int64_t ts_micros =
*reinterpret_cast<int64_t*>(kudu_tuple->GetSlot(slot->tuple_offset()));
TimestampValue tv;
if (state_->query_options().convert_kudu_utc_timestamps) {
tv = TimestampValue::FromUnixTimeMicros(ts_micros, state_->local_time_zone());
} else {
tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
}
if (tv.HasDateAndTime()) {
RawValue::WriteNonNull<false>(&tv, kudu_tuple, slot, nullptr, nullptr, nullptr);
return Status::OK();
}
kudu_tuple->SetNull(slot->null_indicator_offset());
return Status(ErrorMsg::Init(TErrorCode::KUDU_TIMESTAMP_OUT_OF_RANGE,
scan_node_->table_desc()->table_name(),
scanner_->GetKuduTable()->schema().Column(slot->col_pos()).name()));
}
// Kudu tuples containing VARCHAR columns use characters instead of bytes to limit
// the length. In the case of ASCII values there is no difference. However, if
// multi-byte characters are written to Kudu the length could be longer than allowed.
// This checks the actual length and truncates the value length if it is too long.
// TODO(IMPALA-5675): Remove this when Impala supports UTF-8 character VARCHAR length.
Status KuduScanner::ConvertVarcharFromKudu(
Tuple* kudu_tuple, const SlotDescriptor* slot) {
DCHECK(slot->type().type == TYPE_VARCHAR);
if (slot->is_nullable() && kudu_tuple->IsNull(slot->null_indicator_offset())) {
return Status::OK();
}
StringValue* sv =
reinterpret_cast<StringValue*>(kudu_tuple->GetSlot(slot->tuple_offset()));
int src_len = sv->Len();
int dst_len = slot->type().len;
if (src_len > dst_len) {
sv->SetLen(dst_len);
}
return Status::OK();
}
Status KuduScanner::ConvertArrayFromKudu(Tuple* kudu_tuple, const SlotDescriptor* slot,
MemRange& buffer, MemPool* item_tuple_mem_pool, size_t kudu_array_element_byte_size) {
// Check if the slot is NULL.
if (slot->is_nullable() && kudu_tuple->IsNull(slot->null_indicator_offset())) {
return Status::OK();
}
// Check the Kudu column type.
const auto& kudu_column = scanner_->GetKuduTable()->schema().Column(slot->col_pos());
if (UNLIKELY(kudu_column.nested_type() == nullptr
|| kudu_column.nested_type()->array() == nullptr)) {
return Status(Substitute("Kudu table '$0' column '$1' is not an ARRAY column.",
scan_node_->table_desc()->table_name(), kudu_column.name()));
}
// The slot is not NULL. Get the array value.
auto slice = reinterpret_cast<kudu::Slice*>(kudu_tuple->GetSlot(slot->tuple_offset()));
KuduArrayCellView kudu_array(slice->data(), slice->size());
RETURN_IF_ERROR(FromKuduStatus(kudu_array.Init()));
if (UNLIKELY(kudu_array.elem_num() > INT_MAX)) {
return Status(
Substitute("Kudu array length in table '$0' column '$1' is out of limit.",
scan_node_->table_desc()->table_name(), kudu_column.name()));
}
CollectionValue result;
result.num_tuples = kudu_array.elem_num();
// The data pointer is valid only when the array is not empty.
// If the children tuple descriptor contains no slots, we don't need to materialize
// the elements.
if (kudu_array.elem_num() > 0
&& slot->children_tuple_descriptor()->slots().size() > 0) {
int64_t total_tuple_byte_size =
slot->children_tuple_descriptor()->byte_size() * kudu_array.elem_num();
// buffer.len() is 0 initially before the buffer is allocated.
if (UNLIKELY(buffer.len() < total_tuple_byte_size)) {
buffer = MemRange(
item_tuple_mem_pool->TryAllocate(total_tuple_byte_size), total_tuple_byte_size);
}
if (UNLIKELY(buffer.data() == nullptr)) {
return Status(Substitute(
"Could not allocate memory when reading Kudu ARRAY in table '$0' column '$1'",
scan_node_->table_desc()->table_name(), kudu_column.name()));
}
memset(buffer.data(), 0, total_tuple_byte_size);
result.ptr = buffer.data();
// Check the element type.
DCHECK_EQ(slot->children_tuple_descriptor()->slots().size(), 1);
const SlotDescriptor* item_slot = slot->children_tuple_descriptor()->slots().front();
DCHECK_NE(item_slot, nullptr);
DCHECK(!item_slot->type().IsComplexType());
const auto kudu_elem_type = kudu_column.nested_type()->array()->type();
DCHECK_EQ(KuduDataTypeToColumnType(kudu_elem_type, kudu_column.type_attributes()),
item_slot->type());
// Get the data pointer to access the elements.
auto kudu_array_data = reinterpret_cast<const uint8_t*>(
kudu_array.data(kudu_elem_type, kudu_column.type_attributes()));
DCHECK_NE(kudu_array_data, nullptr);
DCHECK_GT(kudu_array_element_byte_size, 0);
const bool no_null_element = !kudu_array.has_nulls();
for (int i = 0; i < result.num_tuples; ++i) {
Tuple* item_tuple = reinterpret_cast<Tuple*>(
result.ptr + i * slot->children_tuple_descriptor()->byte_size());
// The 'not_null_bitmap()' is valid only when 'has_nulls()' returns true.
if (no_null_element || kudu::BitmapTest(kudu_array.not_null_bitmap(), i)) {
memcpy(item_tuple, kudu_array_data + i * kudu_array_element_byte_size,
kudu_array_element_byte_size);
if (item_slot->type().type == TYPE_TIMESTAMP) {
RETURN_IF_ERROR(ConvertTimestampFromKudu(item_tuple, item_slot));
} else if (item_slot->type().type == TYPE_VARCHAR) {
RETURN_IF_ERROR(ConvertVarcharFromKudu(item_tuple, item_slot));
}
} else {
item_tuple->SetNull(item_slot->null_indicator_offset());
}
}
}
// Copy the result CollectionValue to the slot.
slice->clear();
*reinterpret_cast<CollectionValue*>(slice) = result;
return Status::OK();
}
Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_mem) {
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
// Short-circuit for empty projection cases.
@@ -382,61 +538,39 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
bool has_conjuncts = !conjunct_evals_.empty();
int num_rows = cur_kudu_batch_.NumRows();
// MemPool for the item tuples if the result tuple contains collection slots.
MemPool item_tuple_mem_pool(scan_node_->mem_tracker());
// Buffer to hold the item tuples for each collection slot.
vector<MemRange> item_tuple_buffers(
scan_node_->tuple_desc()->collection_slots().size(), MemRange::null());
for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; ++krow_idx) {
Tuple* kudu_tuple = const_cast<Tuple*>(
reinterpret_cast<const Tuple*>(cur_kudu_batch_.direct_data().data()
+ (krow_idx * scan_node_->row_desc()->GetRowSize())));
++cur_kudu_batch_num_read_;
// Kudu tuples containing TIMESTAMP columns (UNIXTIME_MICROS in Kudu, stored as an
// int64) have 8 bytes of padding following the timestamp. Because this padding is
// provided, Impala can convert these unixtime values to Impala's TimestampValue
// format in place and copy the rows to Impala row batches.
// TODO: avoid mem copies with a Kudu mem 'release' mechanism, attaching mem to the
// batch.
// TODO: consider codegen for this per-timestamp col fixup
for (const SlotDescriptor* slot : timestamp_slots_) {
DCHECK(slot->type().type == TYPE_TIMESTAMP);
if (slot->is_nullable() && kudu_tuple->IsNull(slot->null_indicator_offset())) {
continue;
}
int64_t ts_micros = *reinterpret_cast<int64_t*>(
kudu_tuple->GetSlot(slot->tuple_offset()));
TimestampValue tv;
if (state_->query_options().convert_kudu_utc_timestamps) {
tv = TimestampValue::FromUnixTimeMicros(ts_micros, state_->local_time_zone());
} else {
tv = TimestampValue::UtcFromUnixTimeMicros(ts_micros);
}
if (tv.HasDateAndTime()) {
RawValue::Write(&tv, kudu_tuple, slot, nullptr);
} else {
kudu_tuple->SetNull(slot->null_indicator_offset());
RETURN_IF_ERROR(state_->LogOrReturnError(
ErrorMsg::Init(TErrorCode::KUDU_TIMESTAMP_OUT_OF_RANGE,
scan_node_->table_desc()->table_name(),
scanner_->GetKuduTable()->schema().Column(slot->col_pos()).name())));
Status status = ConvertTimestampFromKudu(kudu_tuple, slot);
if (UNLIKELY(!status.ok())) {
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
}
}
// Kudu tuples containing VARCHAR columns use characters instead of bytes to limit
// the length. In the case of ASCII values there is no difference. However, if
// multi-byte characters are written to Kudu the length could be longer than allowed.
// This checks the actual length and truncates the value length if it is too long.
// TODO(IMPALA-5675): Remove this when Impala supports UTF-8 character VARCHAR length.
for (const SlotDescriptor* slot : varchar_slots_) {
DCHECK(slot->type().type == TYPE_VARCHAR);
if (slot->is_nullable() && kudu_tuple->IsNull(slot->null_indicator_offset())) {
continue;
Status status = ConvertVarcharFromKudu(kudu_tuple, slot);
if (UNLIKELY(!status.ok())) {
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
}
StringValue* sv = reinterpret_cast<StringValue*>(
kudu_tuple->GetSlot(slot->tuple_offset()));
int src_len = sv->Len();
int dst_len = slot->type().len;
if (src_len > dst_len) {
sv->SetLen(dst_len);
}
item_tuple_mem_pool.Clear();
for (int i = 0; i < scan_node_->tuple_desc()->collection_slots().size(); ++i) {
auto slot = scan_node_->tuple_desc()->collection_slots()[i];
Status status = ConvertArrayFromKudu(kudu_tuple, slot, item_tuple_buffers[i],
&item_tuple_mem_pool, kudu_array_element_byte_sizes_[i]);
if (!status.ok()) {
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
}
}
@@ -459,6 +593,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
*tuple_mem = next_tuple(*tuple_mem);
}
expr_results_pool_->Clear();
item_tuple_mem_pool.FreeAll();
// Check the status in case an error status was set during conjunct evaluation.
return state_->GetQueryStatus();

View File

@@ -24,6 +24,7 @@
#include "common/object-pool.h"
#include "exec/kudu/kudu-scan-node-base.h"
#include "runtime/descriptors.h"
#include "util/mem-range.h"
namespace impala {
@@ -144,6 +145,26 @@ class KuduScanner {
/// Varchar slots in the tuple descriptor of the scan node. Used to resize Kudu
/// VARCHAR values inline.
vector<const SlotDescriptor*> varchar_slots_;
// Converts the UNIXTIME_MICROS value in the 'slot' of the 'kudu_tuple' to a
// TimestampValue in place.
Status ConvertTimestampFromKudu(Tuple* kudu_tuple, const SlotDescriptor* slot);
// Converts the VARCHAR value in the 'slot' of the 'kudu_table' to a StringValue
// with length limit in place.
Status ConvertVarcharFromKudu(Tuple* kudu_tuple, const SlotDescriptor* slot);
// The byte size of an element in the Kudu array for each collection slot in the tuple.
vector<size_t> kudu_array_element_byte_sizes_;
// Converts the ARRAY value in the 'slot' of the 'kudu_tuple' to a CollectionValue in
// place, using
// - 'buffer' to hold the item tuples, and
// - 'item_tuple_mem_pool' to allocate memory if 'buffer' is too small.
// 'kudu_array_element_byte_size' is the byte size of an element in the Kudu array
Status ConvertArrayFromKudu(Tuple* kudu_tuple, const SlotDescriptor* slot,
MemRange& buffer, MemPool* item_tuple_mem_pool,
size_t kudu_array_element_byte_size);
};
} /// namespace impala

View File

@@ -161,4 +161,16 @@ Status WriteKuduValue(int col, const ColumnType& col_type, const void* value,
return Status::OK();
}
size_t GetKuduArrayElementSize(const SlotDescriptor* slot) {
// KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES does not apply to array elements.
if (slot->type().IsTimestampType()) {
return sizeof(int64_t);
}
// Same as TupleDescriptor#getSlotSize()
constexpr auto KUDU_SLICE_PADDING = 4U;
if (slot->type().IsStringType()) {
return slot->slot_size() + KUDU_SLICE_PADDING;
}
return slot->slot_size();
}
}

View File

@@ -21,6 +21,7 @@
// TODO: Remove when toolchain callbacks.h properly defines ::tm.
struct tm;
#include <kudu/client/array_cell.h>
#include <kudu/client/callbacks.h>
#include <kudu/client/client.h>
#include <kudu/client/value.h>
@@ -28,6 +29,7 @@ struct tm;
#include "util/kudu-status-util.h"
#include "runtime/string-value.h"
#include "runtime/types.h"
#include "runtime/tuple.h"
namespace impala {
@@ -90,5 +92,7 @@ Status ConvertTimestampValueToKudu(const TimestampValue* tv, int64_t* ts_micros)
// Converts a DateValue to Kudu's representation which is returned in 'days'.
Status ConvertDateValueToKudu(const DateValue* dv, int32_t* days);
size_t IR_ALWAYS_INLINE GetKuduArrayElementSize(const SlotDescriptor* slot);
} /// namespace impala
#endif

View File

@@ -172,7 +172,6 @@ class RawValue {
return top_level ? "NULL" : "null";
}
private:
/// Like Write() but 'value' must be non-NULL.
template <bool COLLECT_VAR_LEN_VALS>
static void WriteNonNull(const void* value, Tuple* tuple,
@@ -180,6 +179,7 @@ private:
std::vector<StringValue*>* string_values,
std::vector<std::pair<CollectionValue*, int64_t>>* collection_values);
private:
/// Recursive helper function for Write() to handle struct slots.
template <bool COLLECT_VAR_LEN_VALS>
static void WriteStruct(const void* value, Tuple* tuple,

View File

@@ -91,7 +91,8 @@ case ${IMPALA_DOCKER_JAVA:-8} in
*)
;;
esac
make -j ${IMPALA_BUILD_THREADS} ${IMAGE_TYPE}_images parquet-reader impala-profile-tool
make -j ${IMPALA_BUILD_THREADS} \
${IMAGE_TYPE}_images parquet-reader impala-profile-tool kudu-array-inserter
source_impala_config

View File

@@ -491,11 +491,11 @@ public class CreateTableStmt extends StatementBase implements SingleTableStmt {
// Check column types are valid Kudu types
for (ColumnDef col: getColumnDefs()) {
try {
KuduUtil.fromImpalaType(col.getType());
} catch (ImpalaRuntimeException e) {
throw new AnalysisException(String.format(
"Cannot create table '%s': %s", getTbl(), e.getMessage()));
if (KuduUtil.fromImpalaType(col.getType()) == null) {
String error_msg =
String.format("Cannot create table '%s': Type %s is not supported in Kudu",
getTbl(), col.getType().toSql());
throw new AnalysisException(error_msg);
}
}
AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS),

View File

@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.impala.analysis.TableRef.ZippingUnnestType;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.Type;
import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
import org.apache.impala.common.AnalysisException;
@@ -90,6 +91,7 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
tblRef.analyze(analyzer);
leftTblRef = tblRef;
if (tblRef instanceof CollectionTableRef) {
checkKuduCollectionSupport((CollectionTableRef)tblRef);
checkIcebergCollectionSupport((CollectionTableRef)tblRef);
checkTopLevelComplexAcidScan(analyzer, (CollectionTableRef)tblRef);
if (firstZippingUnnestRef != null && tblRef.isZippingUnnest() &&
@@ -172,6 +174,15 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
}
}
private void checkKuduCollectionSupport(CollectionTableRef tblRef)
throws AnalysisException {
// TODO(IMPALA-14538): Support referencing a Kudu collection column as a table.
if (tblRef.getTable() instanceof FeKuduTable && !tblRef.isRelative()) {
throw new AnalysisException(
"Referencing a Kudu collection column as a table is not supported.");
}
}
@Override
public FromClause clone() {
List<TableRef> clone = new ArrayList<>();

View File

@@ -384,12 +384,12 @@ public class SlotDescriptor {
}
/**
* Returns true if this slot is of STRING type in a kudu table.
* Returns true if this slot is for a Kudu Slice.
*/
public boolean isKuduStringSlot() {
public boolean isKuduSliceSlot() {
if (getParent() == null) return false;
if (!(getParent().getTable() instanceof FeKuduTable)) return false;
return getType().isStringType();
return getType().isStringType() || getType().isArrayType();
}
/**

View File

@@ -75,8 +75,8 @@ import com.google.common.base.Preconditions;
* Offsets: 0 12 16 18 19
*/
public class TupleDescriptor {
// Padding size in bytes for Kudu string slots.
private static final int KUDU_STRING_PADDING = 4;
// Padding size in bytes for Kudu Slice slots.
private static final int KUDU_SLICE_PADDING = 4;
private final TupleId id_;
private final String debugName_; // debug-only
@@ -466,9 +466,7 @@ public class TupleDescriptor {
private int getSlotSize(SlotDescriptor slotDesc) {
int slotSize = slotDesc.getMaterializedSlotSize();
// Add padding for a KUDU string slot.
if (slotDesc.isKuduStringSlot()) {
slotSize += KUDU_STRING_PADDING;
}
if (slotDesc.isKuduSliceSlot()) { slotSize += KUDU_SLICE_PADDING; }
return slotSize;
}
@@ -489,9 +487,9 @@ public class TupleDescriptor {
avgSerializedSize_ += slotDesc.getMaterializedSlotSize();
}
// Add padding for a KUDU string slot.
if (slotDesc.isKuduStringSlot()) {
avgSerializedSize_ += KUDU_STRING_PADDING;
serializedPadSize_ += KUDU_STRING_PADDING;
if (slotDesc.isKuduSliceSlot()) {
avgSerializedSize_ += KUDU_SLICE_PADDING;
serializedPadSize_ += KUDU_SLICE_PADDING;
}
}

View File

@@ -86,7 +86,7 @@ public class KuduColumn extends Column {
public static KuduColumn fromColumnSchema(ColumnSchema colSchema, int position)
throws ImpalaRuntimeException {
Type type = KuduUtil.toImpalaType(colSchema.getType(), colSchema.getTypeAttributes());
Type type = KuduUtil.toImpalaType(colSchema);
Object defaultValue = colSchema.getDefaultValue();
LiteralExpr defaultValueExpr = null;
if (defaultValue != null) {
@@ -144,7 +144,7 @@ public class KuduColumn extends Column {
org.apache.kudu.Type kuduType = Schema.getAutoIncrementingColumnType();
Preconditions.checkArgument(kuduType != org.apache.kudu.Type.DECIMAL &&
kuduType != org.apache.kudu.Type.VARCHAR);
Type type = KuduUtil.toImpalaType(kuduType, null);
Type type = KuduUtil.toImpalaScalarType(kuduType, null);
return new KuduColumn(Schema.getAutoIncrementingColumnName(), type,
/* isKey */true, /* isPrimaryKeyUnique */false, /* isNullable */false,
/* isAutoIncrementing */true, /* encoding */null, /* compression */null,

View File

@@ -204,8 +204,7 @@ public class KuduScanNode extends ScanNode {
"outdated and need to be refreshed.");
}
Type kuduColType =
KuduUtil.toImpalaType(kuduCol.getType(), kuduCol.getTypeAttributes());
Type kuduColType = KuduUtil.toImpalaType(kuduCol);
if (!colType.equals(kuduColType)) {
throw new ImpalaRuntimeException("Column '" + colName + "' is type " +
kuduColType.toSql() + " but Impala expected " + colType.toSql() +

View File

@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.KuduTable;
@@ -37,8 +38,8 @@ import org.apache.impala.common.PrintUtils;
import org.apache.impala.thrift.TAlterTableAddDropRangePartitionParams;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.TKuduPartitionParam;
import org.apache.impala.thrift.TKuduPartitionByHashParam;
import org.apache.impala.thrift.TKuduPartitionParam;
import org.apache.impala.thrift.TRangePartition;
import org.apache.impala.thrift.TRangePartitionOperationType;
import org.apache.impala.util.EventSequence;
@@ -163,9 +164,18 @@ public class KuduCatalogOpExecutor {
boolean isKeyUnique) throws ImpalaRuntimeException {
Type type = Type.fromThrift(column.getColumnType());
Preconditions.checkState(type != null);
// If the type is an ARRAY type, we need to
// 1. Create a ColumnSchemaBuilder for its item type first, and
// 2. Make it an ARRAY column by calling ColumnSchemaBuilder#array().
// Otherwise we need to create a ColumnSchemaBuilder for the type itself.
org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
if (kuduType == null) {
throw new ImpalaRuntimeException(
String.format("Type %s is not supported in Kudu", type.toSql()));
}
ColumnSchemaBuilder csb = new ColumnSchemaBuilder(column.getColumnName(), kuduType);
Type typeOrItemType = type;
if (type.isArrayType()) { typeOrItemType = ((ArrayType) type).getItemType(); }
if (isKey && !isKeyUnique) {
csb.nonUniqueKey(true);
} else {
@@ -183,7 +193,7 @@ public class KuduCatalogOpExecutor {
}
if (column.isSetDefault_value()) {
csb.defaultValue(KuduUtil.getKuduDefaultValue(
column.getDefault_value(), type, column.getColumnName()));
column.getDefault_value(), typeOrItemType, column.getColumnName()));
}
if (column.isSetBlock_size()) csb.desiredBlockSize(column.getBlock_size());
if (column.isSetEncoding()) {
@@ -192,17 +202,17 @@ public class KuduCatalogOpExecutor {
if (column.isSetCompression()) {
csb.compressionAlgorithm(KuduUtil.fromThrift(column.getCompression()));
}
if (type.isDecimal()) {
csb.typeAttributes(
DecimalUtil.typeAttributes(type.getPrecision(), type.getDecimalDigits()));
if (typeOrItemType.isDecimal()) {
csb.typeAttributes(DecimalUtil.typeAttributes(
typeOrItemType.getPrecision(), typeOrItemType.getDecimalDigits()));
}
if (kuduType == org.apache.kudu.Type.VARCHAR) {
csb.typeAttributes(
CharUtil.typeAttributes(type.getColumnSize()));
if (typeOrItemType.isVarchar()) {
csb.typeAttributes(CharUtil.typeAttributes(typeOrItemType.getColumnSize()));
}
if (column.isSetComment() && !column.getComment().isEmpty()) {
csb.comment(column.getComment());
}
if (type.isArrayType()) { csb.array(true); }
return csb.build();
}
@@ -385,8 +395,7 @@ public class KuduCatalogOpExecutor {
"Error loading Kudu table: Impala does not support column names that " +
"differ only in casing '%s'", colSchema.getName()));
}
Type type =
KuduUtil.toImpalaType(colSchema.getType(), colSchema.getTypeAttributes());
Type type = KuduUtil.toImpalaType(colSchema);
String comment =
!colSchema.getComment().isEmpty() ? colSchema.getComment() : null;
cols.add(new FieldSchema(colSchema.getName(), type.toSql().toLowerCase(),

View File

@@ -28,30 +28,28 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.DescriptorTable;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.FunctionCallExpr;
import org.apache.impala.analysis.InsertStmt;
import org.apache.impala.analysis.KuduPartitionExpr;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.FeKuduTable;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnEncoding;
import org.apache.impala.thrift.TColumnValue;
import org.apache.impala.thrift.TExpr;
import org.apache.impala.thrift.TExprNode;
import org.apache.impala.thrift.TExprNodeType;
import org.apache.impala.thrift.THdfsCompression;
import org.apache.impala.util.StringUtils;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
import org.apache.kudu.ColumnSchema.Encoding;
@@ -226,6 +224,10 @@ public class KuduUtil {
TExprNode literal = defaultValue.getNodes().get(0);
if (literal.getNode_type() == TExprNodeType.NULL_LITERAL) return null;
org.apache.kudu.Type type = KuduUtil.fromImpalaType(impalaType);
if (type == null) {
throw new ImpalaRuntimeException(
String.format("Type %s is not supported in Kudu", impalaType.toSql()));
}
switch (type) {
case INT8:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
@@ -423,15 +425,24 @@ public class KuduUtil {
}
/**
* Converts a given Impala catalog type to the Kudu type. Throws an exception if the
* type cannot be converted.
* Converts a given Impala catalog type or its item type to the Kudu type.
* Returns null if the type cannot be converted instead of throwing an exception so
* that the caller can report the full type in the error message. Since this function
* contains recursion, only the outer-most caller has the full type info.
*/
public static org.apache.kudu.Type fromImpalaType(Type t)
throws ImpalaRuntimeException {
public static @Nullable org.apache.kudu.Type fromImpalaType(Type t) {
if (!t.isScalarType()) {
throw new ImpalaRuntimeException(format(
"Type %s is not supported in Kudu", t.toSql()));
// Kudu does not support complex types other than ARRAY.
if (!t.isArrayType()) { return null; }
Type itemType = ((ArrayType) t).getItemType();
// Kudu does not support array of non-scalar types or 16-byte DECIMAL.
if (!itemType.isScalarType()
|| ((ScalarType) itemType).storageBytesForDecimal() == 16) {
return null;
}
return KuduUtil.fromImpalaType(itemType);
}
ScalarType s = (ScalarType) t;
switch (s.getPrimitiveType()) {
case TINYINT: return org.apache.kudu.Type.INT8;
@@ -452,14 +463,16 @@ public class KuduUtil {
case NULL_TYPE:
case DATETIME:
case CHAR:
default:
throw new ImpalaRuntimeException(format(
"Type %s is not supported in Kudu", s.toSql()));
default: return null;
}
}
public static Type toImpalaType(org.apache.kudu.Type t,
/**
* Converts a given Kudu scalar type to its matching Impala scalar type.
*/
public static ScalarType toImpalaScalarType(org.apache.kudu.Type t,
ColumnTypeAttributes typeAttributes) throws ImpalaRuntimeException {
Preconditions.checkState(t != org.apache.kudu.Type.NESTED);
switch (t) {
case BOOL: return Type.BOOLEAN;
case DOUBLE: return Type.DOUBLE;
@@ -482,6 +495,25 @@ public class KuduUtil {
}
}
/**
* Converts a given Kudu colSchema to its matching Impala type. If the Kudu type is
* NESTED, it must be an array and the element type is converted to the
* corresponding Impala type.
*/
public static Type toImpalaType(org.apache.kudu.ColumnSchema colSchema)
throws ImpalaRuntimeException {
org.apache.kudu.Type t = colSchema.getType();
ColumnTypeAttributes typeAttributes = colSchema.getTypeAttributes();
if (t != org.apache.kudu.Type.NESTED) {
return toImpalaScalarType(t, typeAttributes);
}
Preconditions.checkState(colSchema.getNestedTypeDescriptor() != null);
Preconditions.checkState(colSchema.getNestedTypeDescriptor().isArray());
org.apache.kudu.Type kuduElementType =
colSchema.getNestedTypeDescriptor().getArrayDescriptor().getElemType();
return new ArrayType(toImpalaScalarType(kuduElementType, typeAttributes));
}
/**
* Creates and returns an Expr that takes rows being inserted by 'insertStmt' and
* returns the partition number for each row.

View File

@@ -2683,9 +2683,11 @@ public class AnalyzeDDLTest extends FrontendTestBase {
AnalysisError("create table t primary key (id) partition by hash partitions 3" +
" stored as kudu as select id, m from functional.complextypes_fileformat",
"Cannot create table 't': Type MAP<STRING,BIGINT> is not supported in Kudu");
AnalysisError("create table t primary key (id) partition by hash partitions 3" +
" stored as kudu as select id, a from functional.complextypes_fileformat",
"Cannot create table 't': Type ARRAY<INT> is not supported in Kudu");
AnalysisError("create table t primary key (id) partition by hash partitions 3"
+ " stored as kudu as select id, a from functional.complextypes_fileformat",
"Unable to INSERT into target table (default.t) because the column 'a' has "
+ "a complex type 'ARRAY<INT>' and Impala doesn't support inserting into "
+ "tables containing complex type columns");
// IMPALA-6454: CTAS into Kudu tables with primary key specified in upper case.
AnalyzesOk("create table part_kudu_tbl primary key(INT_COL, SMALLINT_COL, ID)" +

View File

@@ -390,10 +390,18 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
"partitioning columns: (1 vs 2). Range partition: 'PARTITION 0 < VALUES <= 1'",
isExternalPurgeTbl);
// Test supported Kudu complex types
AnalyzesOk("create table tab (x int primary key, y ARRAY<INT>) "
+ "partition by hash(x) partitions 3 stored as kudu",
isExternalPurgeTbl);
// Test unsupported Kudu types
List<String> unsupportedTypes = Lists.newArrayList("CHAR(20)",
"STRUCT<f1:INT,f2:STRING>", "ARRAY<INT>", "MAP<STRING,STRING>");
List<String> unsupportedTypes = Lists.newArrayList(
"CHAR(20)", "STRUCT<f1:INT,f2:STRING>", "MAP<STRING,STRING>",
// ARRAY of any complex type or 16-byte DECIMAL is not supported yet.
"ARRAY<ARRAY<INT>>", "ARRAY<MAP<INT,INT>>", "ARRAY<STRUCT<a:INT,b:INT>>",
"ARRAY<DECIMAL(19,19)>"
);
for (String t: unsupportedTypes) {
String expectedError = String.format(
"Cannot create table 'tab': Type %s is not supported in Kudu", t);
@@ -533,6 +541,10 @@ public class AnalyzeKuduDDLTest extends FrontendTestBase {
"default isnull(null, null)) partition by hash (i) partitions 3 " +
"stored as kudu", "Default value of NULL not allowed on non-nullable column: " +
"'x'", isExternalPurgeTbl);
AnalysisError("create table tab (i int primary key, a array<int> default null) " +
"partition by hash (i) partitions 3 stored as kudu",
"Default value NULL (type: NULL_TYPE) is not compatible with column " +
"'a' (type: ARRAY<INT>).", isExternalPurgeTbl);
// Invalid block_size values
AnalysisError("create table tab (i int primary key block_size 1.1) " +
"partition by hash (i) partitions 3 stored as kudu", "Invalid value " +

View File

@@ -747,3 +747,8 @@ stored as kudu
---- CATCH
AnalysisException: BOOLEAN type is not allowed to be part of a PRIMARY KEY therefore not allowed for range-partitioning.
====
---- QUERY
# Array primary key column
create table tab_array_primary_key (array_int array<int> primary key) stored as kudu
---- CATCH
IllegalArgumentException: Array type column: array_int cannot be a key column

View File

@@ -19,6 +19,7 @@ from __future__ import absolute_import, division, print_function
from builtins import range
from copy import deepcopy
from base64 import b64decode
from kudu.schema import (
BOOL,
DOUBLE,
@@ -34,10 +35,13 @@ from kudu.schema import (
DATE)
from kudu.client import Partitioning
from kudu.util import to_unixtime_micros
import json
import logging
import pytest
import os
import random
import re
import subprocess
import textwrap
import threading
import time
@@ -1844,3 +1848,248 @@ class TestKuduInsertWithBufferedTupleDesc(KuduTestSuite):
except Exception as e:
# Not expect to throw exception like "IllegalStateException: null"
assert False, str(e)
class TestKuduArray(KuduTestSuite):
"""
Tests Kudu 1-D array suppport.
"""
def _get_name_from_type(self, data_type):
return re.split("[(<)]", data_type)[0]
def _insert_arrays_into_kudu(self, kudu_table_name):
exec_path = os.environ["IMPALA_HOME"] + \
"/be/build/latest/exec/kudu/kudu-array-inserter"
self.client.log_client(str([exec_path, kudu_table_name]))
subprocess.check_call([exec_path, kudu_table_name])
def _check_table_schema(self, db, table_name, types):
result = self.execute_query("DESCRIBE {0}.{1}".format(db, table_name))
assert ("id", "tinyint") == result.tuples()[0][:2]
for i in range(1, len(result.tuples())):
(col_name, col_type) = result.tuples()[i][:2]
assert (col_type == "array<{0}>".format(types[i - 1].lower())
and col_name == "array_" + self._get_name_from_type(types[i - 1]).lower())
# See be/src/exec/kudu/kudu-array-inserter.cc for the test data
EXPECTED_COLUMNS = {
"INT": (
None,
'[-2147483648,-1,2147483647]',
'[]',
'[null,-1,2147483647]',
'[-2147483648,null,2147483647]',
'[-2147483648,-1,null]',
'[-2147483648,-1,2147483647,-2147483648,-1]',
),
"TIMESTAMP": (
None,
('["1400-01-01 00:00:00",'
'"1969-12-31 23:59:59.999999000",'
'"9999-12-31 23:59:59.999999000"]'),
'[]',
'[null,"1969-12-31 23:59:59.999999000","9999-12-31 23:59:59.999999000"]',
'["1400-01-01 00:00:00",null,"9999-12-31 23:59:59.999999000"]',
'["1400-01-01 00:00:00","1969-12-31 23:59:59.999999000",null]',
('["1400-01-01 00:00:00",'
'"1969-12-31 23:59:59.999999000",'
'"9999-12-31 23:59:59.999999000",'
'"1400-01-01 00:00:00",'
'"1969-12-31 23:59:59.999999000"]'),
),
# The output of the ARRAY<VARCHAR(1)> data are NOT valid UTF-8 strings.
"VARCHAR(1)": (
None,
b'["\xce","\xcf","\xce"]',
'[]',
b'[null,"\xcf","\xce"]',
b'["\xce",null,"\xce"]',
b'["\xce","\xcf",null]',
b'["\xce","\xcf","\xce","\xce","\xcf"]',
),
"DECIMAL(18,18)": (
None,
'[-0.999999999999999999,-0.000000000000000001,0.999999999999999999]',
'[]',
'[null,-0.000000000000000001,0.999999999999999999]',
'[-0.999999999999999999,null,0.999999999999999999]',
'[-0.999999999999999999,-0.000000000000000001,null]',
('[-0.999999999999999999,-0.000000000000000001,0.999999999999999999,'
'-0.999999999999999999,-0.000000000000000001]'),
),
"DOUBLE": (
None,
'[-Infinity,NaN,Infinity]',
'[]',
'[null,NaN,Infinity]',
'[-Infinity,null,Infinity]',
'[-Infinity,NaN,null]',
'[-Infinity,NaN,Infinity,-Infinity,NaN]',
),
# The output of each element in an ARRAY<BINARY> is Base64 encoded.
"BINARY": (
None,
'["zqM=","z4A=","zrs="]',
'[]',
'[null,"z4A=","zrs="]',
'["zqM=",null,"zrs="]',
'["zqM=","z4A=",null]',
'["zqM=","z4A=","zrs=","zqM=","z4A="]',
),
"BOOLEAN": (
None,
'[true,false,true]',
'[]',
'[null,false,true]',
'[true,null,true]',
'[true,false,null]',
'[true,false,true,true,false]',
),
}
def _check_table_data(self, db, table_name, types, query_options):
columns = ", ".join([
"array_{0}".format(self._get_name_from_type(item_type))
for item_type in types
])
result = self.execute_query("SELECT id, {0} FROM {1}.{2}".format(
columns, db, table_name), query_options=query_options)
for i, result_column in enumerate(zip(*result.tuples())):
if i == 0:
assert result_column == tuple(range(len(result.tuples())))
else:
assert result_column == self.EXPECTED_COLUMNS[types[i - 1]]
def _unnest_expected_column(self, item_type):
if item_type == "VARCHAR(1)":
return (
b'\xce', b'\xcf', b'\xce',
None, b'\xcf', b'\xce',
b'\xce', None, b'\xce',
b'\xce', b'\xcf', None,
b'\xce', b'\xcf', b'\xce', b'\xce', b'\xcf',
)
result = []
for serialized_array in self.EXPECTED_COLUMNS[item_type]:
if serialized_array is not None:
array = json.loads(
serialized_array,
parse_float=(lambda s: s) if item_type.startswith("DECIMAL") else None)
if item_type == "BINARY":
result += [b64decode(elem) if elem is not None else None for elem in array]
else:
result += array
return tuple(result)
EXPECTED_ID_UNNESTED = (
1, 1, 1,
3, 3, 3,
4, 4, 4,
5, 5, 5,
6, 6, 6, 6, 6
)
def _check_unnest(self, db, table_name, types, query_options, in_select_list):
if in_select_list:
columns = ", ".join([
"UNNEST(array_{0})".format(self._get_name_from_type(item_type))
for item_type in types
])
result = self.execute_query("SELECT id, {0} FROM {1}.{2}".format(
columns, db, table_name), query_options=query_options)
else:
columns = ", ".join([
"{0}.array_{1}".format(table_name, self._get_name_from_type(item_type))
for item_type in types
])
result = self.execute_query("SELECT * FROM {1}.{2}, UNNEST({0})".format(
columns, db, table_name), query_options=query_options)
for i, result_column in enumerate(zip(*result.tuples())):
if i == 0:
assert result_column == self.EXPECTED_ID_UNNESTED
elif types[i - 1] == "DOUBLE":
# NaN cannot be compared directly.
assert str(result_column) == str(self._unnest_expected_column(types[i - 1]))
else:
assert result_column == self._unnest_expected_column(types[i - 1])
def _check_non_materialzied_elements(self, db, table_name, item_type, options):
result = self.execute_query("SELECT id FROM {0}.{1} AS t, t.array_{2}".format(
db, table_name, self._get_name_from_type(item_type)), query_options=options)
for result_column in zip(*result.tuples()):
assert result_column == self.EXPECTED_ID_UNNESTED
def test_supported_types(self, unique_database, vector):
"""
Test array column support for kudu against [unique_database].kudu_array
and external table [unique_database].kudu_array_external.
"""
db = unique_database
options = vector.get_value('exec_option')
SUPPORTED_ITEM_TYPES = [
"INT",
"TIMESTAMP",
"VARCHAR(1)",
"DECIMAL(18,18)",
"DOUBLE",
"BINARY",
"BOOLEAN",
]
TEST_TABLE, TEST_EXTERNAL_TABLE = "kudu_array", "kudu_array_external"
column_defs = ", ".join([
"array_{0} ARRAY<{1}>".format(self._get_name_from_type(item_type), item_type)
for item_type in SUPPORTED_ITEM_TYPES
])
# The table is unpartitioned to ensure the rows are read in the insertion order.
create_table_sql = (
"CREATE TABLE {0}.{1} (id TINYINT PRIMARY KEY, {2}) "
"STORED AS KUDU"
)
# Create table [unique_database].kudu_array
self.execute_query(create_table_sql.format(db, TEST_TABLE, column_defs))
self._check_table_schema(db, TEST_TABLE, SUPPORTED_ITEM_TYPES)
# Create external table [unique_database].kudu_array_external pointing to the same
# table as [unique_database].kudu_array and check the schema as well.
kudu_table_name = "impala::{0}.{1}".format(db, TEST_TABLE)
self.execute_query(create_external_kudu_query(
db, TEST_EXTERNAL_TABLE, kudu_table_name))
self._check_table_schema(
db, TEST_EXTERNAL_TABLE, SUPPORTED_ITEM_TYPES)
# Insert some rows using kudu-array-inserter and read the data back
# through both table.
self._insert_arrays_into_kudu(kudu_table_name)
self._check_table_data(
db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options)
self._check_table_data(
db, TEST_EXTERNAL_TABLE, SUPPORTED_ITEM_TYPES, options)
# Check the result of UNNEST().
self._check_unnest(
db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options, in_select_list=True)
self._check_unnest(
db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options, in_select_list=False)
# Check the result when the array elements are not materialized.
self._check_non_materialzied_elements(db, TEST_TABLE, "BINARY", options)
# TODO(IMPALA-14539): Support duplicate collection slots.
sql = "SELECT array_{2} FROM {0}.{1}, {1}.array_{2} AS unnested"
exc = str(self.execute_query_expect_failure(self.client, sql.format(
db, TEST_TABLE, "INT")))
assert (
"Unable to deserialize scan token for node with id '0' for Kudu table "
"'impala::{0}.{1}': Invalid argument: Duplicate column name: array_{2}"
).format(db, TEST_TABLE, "int") in exc
# TODO(IMPALA-14538): Support referencing a Kudu collection column as a table.
sql = "SELECT pos, item FROM {0}.{1}.array_{2}"
exc = str(self.execute_query_expect_failure(self.client, sql.format(
db, TEST_TABLE, "INT")))
assert (
"AnalysisException: "
"Referencing a Kudu collection column as a table is not supported."
) in exc