From 6cb7cecacf0c091a19eb261f3a9c7d841c0e0676 Mon Sep 17 00:00:00 2001 From: Gabor Kaszab Date: Wed, 18 Nov 2020 15:29:20 +0100 Subject: [PATCH] IMPALA-10237: Support Bucket and Truncate partition transforms as built-in functions This patch implements Truncate and Bucket partition transforms in Impala BE as built-in functions. The expectation is that these functions give the same result as Iceberg's implementation of the same functions. These built-in functions are invisible so users won't be able to invoke them e.g. from impala-shell. Truncate: - Supported types are IntVal, BigIntVal, StringVal, DecimalVal. - Receives an input from the above types and a width. - Returns the same type as the input. - Expected behaviour is explained here: https://iceberg.apache.org/spec/#truncate-transform-details Bucket: - Supported types are IntVal, BigIntVal, StringVal, DecimalVal, DateVal, TimestampVal. - Receives an input from the above types and the number of buckets as IntVal. - Returns IntVal. - Expected behaviour is explained here: https://iceberg.apache.org/spec/#bucket-transform-details Change-Id: I485680cf79d96d578dd8cfbfd554bec468fe84bd Reviewed-on: http://gerrit.cloudera.org:8080/16741 Reviewed-by: Gabor Kaszab Tested-by: Impala Public Jenkins --- be/src/codegen/impala-ir.cc | 1 + be/src/exprs/CMakeLists.txt | 6 + be/src/exprs/iceberg-functions-ir.cc | 216 +++++++ be/src/exprs/iceberg-functions-test.cc | 593 +++++++++++++++++++ be/src/exprs/iceberg-functions.h | 109 ++++ be/src/exprs/scalar-expr-evaluator.cc | 4 +- be/src/thirdparty/murmurhash/MurmurHash3.cpp | 335 +++++++++++ be/src/thirdparty/murmurhash/MurmurHash3.h | 40 ++ be/src/thirdparty/murmurhash/README.md | 9 + be/src/util/bit-util-test.cc | 70 +++ be/src/util/bit-util.h | 29 + bin/rat_exclude_files.txt | 1 + bin/run_clang_tidy.sh | 2 +- common/function-registry/impala_functions.py | 25 + 14 files changed, 1438 insertions(+), 2 deletions(-) create mode 100644 be/src/exprs/iceberg-functions-ir.cc create mode 100644 be/src/exprs/iceberg-functions-test.cc create mode 100644 be/src/exprs/iceberg-functions.h create mode 100644 be/src/thirdparty/murmurhash/MurmurHash3.cpp create mode 100644 be/src/thirdparty/murmurhash/MurmurHash3.h create mode 100644 be/src/thirdparty/murmurhash/README.md diff --git a/be/src/codegen/impala-ir.cc b/be/src/codegen/impala-ir.cc index 0c0339945..8ad457359 100644 --- a/be/src/codegen/impala-ir.cc +++ b/be/src/codegen/impala-ir.cc @@ -49,6 +49,7 @@ #include "exprs/decimal-functions-ir.cc" #include "exprs/decimal-operators-ir.cc" #include "exprs/hive-udf-call-ir.cc" +#include "exprs/iceberg-functions-ir.cc" #include "exprs/in-predicate-ir.cc" #include "exprs/is-null-predicate-ir.cc" #include "exprs/kudu-partition-expr-ir.cc" diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index c89800709..9652a5b7c 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -22,6 +22,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs") # where to put generated binaries set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs") +set(MURMURHASH_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/murmurhash") + add_library(Exprs agg-fn.cc agg-fn-evaluator.cc @@ -44,6 +46,7 @@ add_library(Exprs expr.cc hive-udf-call.cc hive-udf-call-ir.cc + iceberg-functions-ir.cc in-predicate-ir.cc is-not-empty-predicate.cc is-null-predicate-ir.cc @@ -54,6 +57,7 @@ add_library(Exprs literal.cc mask-functions-ir.cc math-functions-ir.cc + ${MURMURHASH_SRC_DIR}/MurmurHash3.cpp null-literal.cc operators-ir.cc scalar-expr.cc @@ -79,12 +83,14 @@ add_dependencies(Exprs gen-deps gen_ir_descriptions) add_library(ExprsTests STATIC datasketches-test.cc expr-test.cc + iceberg-functions-test.cc timezone_db-test.cc ) add_dependencies(ExprsTests gen-deps) ADD_UNIFIED_BE_LSAN_TEST(datasketches-test "TestDataSketchesHll.*:TestDataSketchesKll.*:TestDataSketchesCpc.*") +ADD_UNIFIED_BE_LSAN_TEST(iceberg-functions-test "TestIcebergFunctions.*") ADD_UNIFIED_BE_LSAN_TEST(expr-test "Instantiations/ExprTest.*") # Exception to unified be tests: custom main initiailizes LLVM ADD_BE_LSAN_TEST(expr-codegen-test) diff --git a/be/src/exprs/iceberg-functions-ir.cc b/be/src/exprs/iceberg-functions-ir.cc new file mode 100644 index 000000000..2194e6f5a --- /dev/null +++ b/be/src/exprs/iceberg-functions-ir.cc @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/iceberg-functions.h" + +#include +#include + +#include "common/compiler-util.h" +#include "common/logging.h" +#include "runtime/timestamp-value.inline.h" +#include "thirdparty/murmurhash/MurmurHash3.h" +#include "udf/udf-internal.h" +#include "util/bit-util.h" + +namespace impala { + +using std::numeric_limits; + +const std::string IcebergFunctions::INCORRECT_WIDTH_ERROR_MSG = + "Width parameter should be greater than zero."; + +const std::string IcebergFunctions::TRUNCATE_OVERFLOW_ERROR_MSG = + "Truncate operation overflows for the given input."; + +const unsigned IcebergFunctions::BUCKET_TRANSFORM_SEED = 0; + +IntVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx, + const IntVal& input, const IntVal& width) { + return TruncatePartitionTransformNumericImpl(ctx, input, width); +} + +BigIntVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx, + const BigIntVal& input, const BigIntVal& width) { + return TruncatePartitionTransformNumericImpl(ctx, input, width); +} + +DecimalVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx, + const DecimalVal& input, const IntVal& width) { + return TruncatePartitionTransform(ctx, input, BigIntVal(width.val)); +} + +DecimalVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx, + const DecimalVal& input, const BigIntVal& width) { + if (!CheckInputsAndSetError(ctx, input, width)) return DecimalVal::null(); + int decimal_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0); + return TruncateDecimal(ctx, input, width, decimal_size); +} + +DecimalVal IcebergFunctions::TruncateDecimal(FunctionContext* ctx, + const DecimalVal& input, const BigIntVal& width, int decimal_size) { + switch (decimal_size) { + case 4: { + // If the input is negative and within int32 range while the width is bigger than + // int32 max value than the expected would be to return the width. However, it + // won't fit into an int32 so an overflow error is returned instead. + if (UNLIKELY(input.val4 < 0 && width.val > std::numeric_limits::max())) { + ctx->SetError(TRUNCATE_OVERFLOW_ERROR_MSG.c_str()); + return DecimalVal::null(); + } + return TruncatePartitionTransformDecimalImpl(input.val4, width.val); + } + case 8: { + return TruncatePartitionTransformDecimalImpl(input.val8, width.val); + } + case 16: { + DecimalVal result = TruncatePartitionTransformDecimalImpl(input.val16, width.val); + if (input.val16 < 0 && result.val16 > 0) { + ctx->SetError(TRUNCATE_OVERFLOW_ERROR_MSG.c_str()); + return DecimalVal::null(); + } + return result; + } + default: + return DecimalVal::null(); + } +} + +template +DecimalVal IcebergFunctions::TruncatePartitionTransformDecimalImpl(const T& decimal_val, + int64_t width) { + DCHECK(width > 0); + if (decimal_val > 0) return decimal_val - (decimal_val % width); + return decimal_val - (((decimal_val % width) + width) % width); +} + +StringVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx, + const StringVal& input, const IntVal& width) { + if (!CheckInputsAndSetError(ctx, input, width)) return StringVal::null(); + if (input.len <= width.val) return input; + return StringVal::CopyFrom(ctx, input.ptr, width.val); +} + +template +T IcebergFunctions::TruncatePartitionTransformNumericImpl(FunctionContext* ctx, + const T& input, const W& width) { + if (!CheckInputsAndSetError(ctx, input, width)) return T::null(); + if (input.val >= 0) return input.val - (input.val % width.val); + T result = input.val - (((input.val % width.val) + width.val) % width.val); + if (UNLIKELY(result.val > 0)) { + ctx->SetError(TRUNCATE_OVERFLOW_ERROR_MSG.c_str()); + return T::null(); + } + return result; +} + +IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx, + const IntVal& input, const IntVal& width) { + return BucketPartitionTransformNumericImpl(ctx, input, width); +} + +IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx, + const BigIntVal& input, const IntVal& width) { + return BucketPartitionTransformNumericImpl(ctx, input, width); +} + +IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx, + const DecimalVal& input, const IntVal& width) { + if (!CheckInputsAndSetError(ctx, input, width)) return IntVal::null(); + int decimal_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0); + return BucketDecimal(ctx, input, width, decimal_size); +} + +IntVal IcebergFunctions::BucketDecimal(FunctionContext* ctx, + const DecimalVal& input, const IntVal& width, int decimal_size) { + DCHECK(!input.is_null || !width.is_null); + // Iceberg converts the decimal input's unscaled value into a byte array and hash-es + // that byte array instead of the underlying integer representation. We do the same + // here to be compatible with Iceberg. + std::string buffer; + switch (decimal_size) { + case 4: { + buffer = BitUtil::IntToByteBuffer(input.val4); + break; + } + case 8: { + buffer = BitUtil::IntToByteBuffer(input.val8); + break; + } + case 16: { + buffer = BitUtil::IntToByteBuffer(input.val16); + break; + } + default: { + return IntVal::null(); + } + } + int hash_result; + MurmurHash3_x86_32(buffer.data(), buffer.size(), BUCKET_TRANSFORM_SEED, + &hash_result); + return (hash_result & std::numeric_limits::max()) % width.val; +} + +IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx, + const StringVal& input, const IntVal& width) { + if (!CheckInputsAndSetError(ctx, input, width)) return IntVal::null(); + int hash_result; + MurmurHash3_x86_32(input.ptr, input.len, BUCKET_TRANSFORM_SEED, &hash_result); + return (hash_result & std::numeric_limits::max()) % width.val; +} + +IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx, + const DateVal& input, const IntVal& width) { + if (input.is_null) return IntVal::null(); + return BucketPartitionTransformNumericImpl(ctx, IntVal(input.val), width); +} + +IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx, + const TimestampVal& input, const IntVal& width) { + if (input.is_null) return IntVal::null(); + TimestampValue tv = TimestampValue::FromTimestampVal(input); + // Iceberg stores timestamps in int64 so to be in line Impala's bucket partition + // transform also uses an int64 representation. + int64_t micros_since_epoch; + tv.FloorUtcToUnixTimeMicros(µs_since_epoch); + return BucketPartitionTransformNumericImpl(ctx, BigIntVal(micros_since_epoch), width); +} + +template +IntVal IcebergFunctions::BucketPartitionTransformNumericImpl(FunctionContext* ctx, + const T& input, const W& width) { + if (!CheckInputsAndSetError(ctx, input, width)) return IntVal::null(); + // Iceberg's int based bucket partition transform converts the int input to long before + // getting the hash value of it. We do the same to guarantee compatibility. + long key = (long)input.val; + int hash_result; + MurmurHash3_x86_32(&key, sizeof(long), BUCKET_TRANSFORM_SEED, &hash_result); + return (hash_result & std::numeric_limits::max()) % width.val; +} + +template +bool IcebergFunctions::CheckInputsAndSetError(FunctionContext* ctx, const T& input, + const W& width) { + if (width.is_null || width.val <= 0) { + ctx->SetError(INCORRECT_WIDTH_ERROR_MSG.c_str()); + return false; + } + if (input.is_null) return false; + return true; +} + +} diff --git a/be/src/exprs/iceberg-functions-test.cc b/be/src/exprs/iceberg-functions-test.cc new file mode 100644 index 000000000..7d7167ef5 --- /dev/null +++ b/be/src/exprs/iceberg-functions-test.cc @@ -0,0 +1,593 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exprs/iceberg-functions.h" +#include "thirdparty/murmurhash/MurmurHash3.h" + +#include +#include +#include + +#include "runtime/date-value.h" +#include "runtime/exec-env.h" +#include "runtime/mem-pool.h" +#include "runtime/mem-tracker.h" +#include "runtime/runtime-state.h" +#include "runtime/timestamp-value.h" +#include "testutil/gtest-util.h" +#include "udf/udf-internal.h" + +namespace impala { + +// Create a FunctionContext for tests that doesn't allocate memory through a mem pool. +FunctionContext* CreateFunctionContext( + MemPool* pool) { + FunctionContext::TypeDesc return_type; + return_type.type = FunctionContext::Type::TYPE_INT; + FunctionContext::TypeDesc param_desc; + param_desc.type = FunctionContext::Type::TYPE_INT; + std::vector arg_types(2, param_desc); + FunctionContext* ctx = FunctionContextImpl::CreateContext( + nullptr, pool, pool, return_type, arg_types, 0, true); + EXPECT_TRUE(ctx != nullptr); + return ctx; +} + +// Create a FunctionContext where the used mem pool comes as a parameter. Can be used +// for tests that allocate memory through mem pool, e.g. truncate strings tests. +FunctionContext* CreateFunctionContext() { + MemTracker m; + MemPool pool(&m); + return CreateFunctionContext(&pool); +} + +// numeric_limits seems to have a bug when used with __int128_t as it returns zero +// for both the min and the max value. This function gives the min value for 128 byte +// int. +__int128_t GetMinForInt128() { + __int128_t num = -1; + for (int i = 0; i < 126; ++i) { + num <<= 1; + num -= 1; + } + return num; +} + +// All the expected values in the tests below are reflecting what Iceberg would return +// for the given inputs. I used Iceberg's org.apache.iceberg.transforms.TestBucketing +// unit tests to check what is the expected result for a particular input. + +class IcebergTruncatePartitionTransformTests { +public: + static void TestIntegerNumbers(); + + static void TestString(); + + static void TestDecimal(); +private: + template + static void TestIntegerNumbersHelper(); + + template + static void TestIncorrectWidthParameter(const T& input); +}; + +class IcebergBucketPartitionTransformTests { +public: + static void TestIntegerNumbers(); + + static void TestString(); + + static void TestDecimal(); + + static void TestDate(); + + static void TestTimestamp(); +private: + template + static void TestIntegerNumbersHelper(); + + template + static void TestIncorrectWidthParameter(const T& input); +}; + +void IcebergTruncatePartitionTransformTests::TestIntegerNumbers() { + IcebergTruncatePartitionTransformTests::TestIntegerNumbersHelper(); + IcebergTruncatePartitionTransformTests::TestIntegerNumbersHelper(); + + // Check that BigInt width is valid. + int64_t max_value64 = std::numeric_limits::max(); + BigIntVal ret_val = IcebergFunctions::TruncatePartitionTransform(nullptr, + BigIntVal(max_value64), BigIntVal(max_value64)); + EXPECT_EQ(max_value64, ret_val.val); +} + +template +void IcebergTruncatePartitionTransformTests::TestIntegerNumbersHelper() { + // Test positive inputs. + T ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, T(15), T(10)); + EXPECT_EQ(10, ret_val.val); + + ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, T(15), T(4)); + EXPECT_EQ(12, ret_val.val); + + // Test negative inputs. + ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, T(-1), T(10)); + EXPECT_EQ(-10, ret_val.val); + + ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, T(-15), T(4)); + EXPECT_EQ(-16, ret_val.val); + + ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, T(-15), T(1)); + EXPECT_EQ(-15, ret_val.val); + + // Test NULL input. + ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, T::null(), T(10)); + EXPECT_TRUE(ret_val.is_null); + + // Check for overflow when truncating from min value + FunctionContext* ctx = CreateFunctionContext(); + typename T::underlying_type_t num_min = + std::numeric_limits::min(); + ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, T(num_min), T(10000)); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Truncate operation overflows for the given input.", + ctx->error_msg()), 0); + ctx->impl()->Close(); + + TestIncorrectWidthParameter(T(0)); +} + +void IcebergTruncatePartitionTransformTests::TestString() { + MemTracker m; + MemPool pool(&m); + StringVal input("Some test input"); + FunctionContext* ctx = CreateFunctionContext(&pool); + + int width = 10; + StringVal ret_val = IcebergFunctions::TruncatePartitionTransform( + ctx, input, IntVal(width)); + EXPECT_EQ(ret_val.len, width); + EXPECT_EQ(strncmp((char*)input.ptr, (char*)ret_val.ptr, width), 0); + + // Truncate width is longer than the input string. + ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, 100); + EXPECT_EQ(ret_val.len, input.len); + EXPECT_EQ(strncmp((char*)input.ptr, (char*)ret_val.ptr, input.len), 0); + + // Truncate width is the same as the the input string length. + ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, input.len); + EXPECT_EQ(ret_val.len, input.len); + EXPECT_EQ(strncmp((char*)input.ptr, (char*)ret_val.ptr, input.len), 0); + + // Test NULL input. + ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, StringVal::null(), IntVal(10)); + EXPECT_TRUE(ret_val.is_null); + + // Test empty string input. + ctx = CreateFunctionContext(&pool); + ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, "", 10); + EXPECT_EQ(ret_val.len, 0); + + TestIncorrectWidthParameter(StringVal("input")); + + pool.FreeAll(); +} + +void IcebergTruncatePartitionTransformTests::TestDecimal() { + // Testing decimal in unit tests by invoking TruncatePartitionTransform seems + // problematic as it queries ARG_TYPE_SIZE from FunctionContext. Apparently, it is not + // properly set in unit tests. Use TruncateDecimal instead that gets the size as + // parameter. + DecimalVal ret_val = IcebergFunctions::TruncateDecimal(nullptr, 10050, 100, 4); + EXPECT_EQ(10000, ret_val.val4); + + ret_val = IcebergFunctions::TruncateDecimal(nullptr, 100, 8, 4); + EXPECT_EQ(96, ret_val.val4); + + ret_val = IcebergFunctions::TruncateDecimal(nullptr, 0, 10, 4); + EXPECT_EQ(0, ret_val.val4); + + ret_val = IcebergFunctions::TruncateDecimal(nullptr, -1, 10, 4); + EXPECT_EQ(-10, ret_val.val4); + + ret_val = IcebergFunctions::TruncateDecimal(nullptr, 12345, 1, 4); + EXPECT_EQ(12345, ret_val.val4); + + // Test BigInt width with positive int32 Decimal inputs. + ret_val = IcebergFunctions::TruncateDecimal(nullptr, 12345, 123456789012, 4); + EXPECT_EQ(0, ret_val.val4); + + ret_val = IcebergFunctions::TruncateDecimal(nullptr, 12345, 100000000001, 4); + EXPECT_EQ(0, ret_val.val4); + + // Test BigInt width with negative int32 Decimal inputs. In this case int32 is not big + // enough to store the result and an overflow error is expected. + FunctionContext* ctx = CreateFunctionContext(); + ret_val = IcebergFunctions::TruncateDecimal(ctx, -12345, 123456789012, 4); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Truncate operation overflows for the given input.", + ctx->error_msg()), 0); + ctx->impl()->Close(); + + ctx = CreateFunctionContext(); + ret_val = IcebergFunctions::TruncateDecimal(ctx, -12345, 100000000001, 4); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Truncate operation overflows for the given input.", + ctx->error_msg()), 0); + ctx->impl()->Close(); + + // Test the maximum limit for each representation size. + // Note, decimal type represent int32 max in 8 bytes, int64 max in 16 bytes. + int32_t int32_max_value = std::numeric_limits::max(); + ret_val = IcebergFunctions::TruncateDecimal(nullptr, int32_max_value, 50, 8); + EXPECT_EQ(int32_max_value - (int32_max_value % 50), ret_val.val4); + + int64_t int64_max_value = std::numeric_limits::max(); + ret_val = IcebergFunctions::TruncateDecimal(nullptr, int64_max_value, 50, 16); + EXPECT_EQ(int64_max_value - (int64_max_value % 50), ret_val.val8); + + __int128_t int128_max_value = std::numeric_limits<__int128_t>::max(); + ret_val = IcebergFunctions::TruncateDecimal(nullptr, int128_max_value, 50, 16); + EXPECT_EQ(int128_max_value - (int128_max_value % 50), ret_val.val16); + + // Test the minimum limit for each representation size. + // Note, decimal type represent int32 min in 8 bytes, int64 min in 16 bytes. + int32_t int32_min_value = std::numeric_limits::min(); + int width = 50; + ret_val = IcebergFunctions::TruncateDecimal(nullptr, int32_min_value, width, 8); + int64_t expected64 = (int64_t)int32_min_value - (width + (int32_min_value % width)); + EXPECT_EQ(expected64, ret_val.val8); + + int64_t int64_min_value = std::numeric_limits::min(); + ret_val = IcebergFunctions::TruncateDecimal(nullptr, int64_min_value, width, 16); + __int128_t expected128 = + (__int128_t)int64_min_value - (width + (int64_min_value % width)); + EXPECT_EQ(expected128, ret_val.val16); + + // Truncation from int128 minimum value causes an overflow. + ctx = CreateFunctionContext(); + ret_val = IcebergFunctions::TruncateDecimal(ctx, GetMinForInt128(), width, 16); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Truncate operation overflows for the given input.", + ctx->error_msg()), 0); + ctx->impl()->Close(); + + // Test NULL input. + ret_val = IcebergFunctions::TruncatePartitionTransform( + nullptr, DecimalVal::null(), BigIntVal(10)); + EXPECT_TRUE(ret_val.is_null); + + TestIncorrectWidthParameter(DecimalVal(0)); + TestIncorrectWidthParameter(DecimalVal(0)); +} + +template +void IcebergTruncatePartitionTransformTests::TestIncorrectWidthParameter( + const T& input) { + // Check for error when width is zero. + FunctionContext* ctx = CreateFunctionContext(); + T ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, W(0)); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0); + ctx->impl()->Close(); + + // Check for error when width is negative. + ctx = CreateFunctionContext(); + ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, W(-1)); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0); + ctx->impl()->Close(); + + // Check for error when width is null. + ctx = CreateFunctionContext(); + ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, W::null()); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0); + ctx->impl()->Close(); +} + +void IcebergBucketPartitionTransformTests::TestIntegerNumbers() { + TestIntegerNumbersHelper(); + TestIntegerNumbersHelper(); + + // Check for max input value. + IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + IntVal(std::numeric_limits::max()), 100); + EXPECT_EQ(6, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + BigIntVal(std::numeric_limits::max()), 100); + EXPECT_EQ(99, ret_val.val); + + // Check for min input value. + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + IntVal(std::numeric_limits::min()), 1000); + EXPECT_EQ(856, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + BigIntVal(std::numeric_limits::min()), 1000); + EXPECT_EQ(829, ret_val.val); +} + +template +void IcebergBucketPartitionTransformTests::TestIntegerNumbersHelper() { + IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(0), 1000000); + EXPECT_EQ(671676, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(0), 100); + EXPECT_EQ(76, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(12345), 50); + EXPECT_EQ(41, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(12345), 12345); + EXPECT_EQ(12311, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(-987654321), 14500); + EXPECT_EQ(7489, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(-155), 100); + EXPECT_EQ(99, ret_val.val); + + // Check for null input. + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T::null(), 1); + EXPECT_TRUE(ret_val.is_null); + + TestIncorrectWidthParameter(T(0)); +} + +void IcebergBucketPartitionTransformTests::TestString() { + IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, "Input string", + 100); + EXPECT_EQ(52, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, "Input string", + 12345); + EXPECT_EQ(5717, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + " ---====1234567890====--- ", 100); + EXPECT_EQ(27, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + " ---====1234567890====--- ", 654321); + EXPECT_EQ(641267, ret_val.val); + + // Check for empty string input. + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, "", 10); + EXPECT_EQ(0, ret_val.val); + + // Check for null input. + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, StringVal::null(), 1); + EXPECT_TRUE(ret_val.is_null); + + TestIncorrectWidthParameter(StringVal("input")); +} + +void IcebergBucketPartitionTransformTests::TestDecimal() { + // Testing decimal in unit tests by invoking BucketPartitionTransform seems + // problematic as it queries ARG_TYPE_SIZE from FunctionContext. Apparently, it is not + // properly set in unit tests. Use BucketDecimal instead that skips checking function + // context for ARG_TYPE_SIZE. + + // Test int32 based decimal inputs. + IntVal ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345, 100, 4); + EXPECT_EQ(28, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345, 555, 4); + EXPECT_EQ(393, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, 2047, 900, 4); + EXPECT_EQ(439, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, 0, 10, 4); + EXPECT_EQ(7, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, -987654321, 15000, 4); + EXPECT_EQ(8732, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, -987654321, 2000, 4); + EXPECT_EQ(1732, ret_val.val); + + // Test int64 based decimal inputs. + ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345678901L, 10000, 8); + EXPECT_EQ(5191, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345678901L, 150, 8); + EXPECT_EQ(41, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, 330011994488229955L, 4800, 8); + EXPECT_EQ(1114, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, 330011994488229955L, 73400, 8); + EXPECT_EQ(12914, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, -360111674415228955L, 5000, 8); + EXPECT_EQ(187, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal(nullptr, -360111674415228955L, 9800, 8); + EXPECT_EQ(9187, ret_val.val); + + // Test int128 based decimal inputs. + ret_val = IcebergFunctions::BucketDecimal( + nullptr, __int128_t(9223372036854775807) + 1, 5550, 16); + EXPECT_EQ(651, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal( + nullptr, __int128_t(9223372036854775807) + 1, 18000, 16); + EXPECT_EQ(12951, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal( + nullptr, __int128_t(123321456654789987) * 1000, 18000, 16); + EXPECT_EQ(15169, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal( + nullptr, __int128_t(-123321456654789987) * 1000, 11000, 16); + EXPECT_EQ(8353, ret_val.val); + + ret_val = IcebergFunctions::BucketDecimal( + nullptr, __int128_t(-123321456654789987) * 1000, 7300, 16); + EXPECT_EQ(853, ret_val.val); + + // Test NULL input. + ret_val = IcebergFunctions::BucketPartitionTransform( + nullptr, DecimalVal::null(), IntVal(10)); + EXPECT_TRUE(ret_val.is_null); + + TestIncorrectWidthParameter(DecimalVal(0)); + TestIncorrectWidthParameter(DecimalVal(0)); +} + +void IcebergBucketPartitionTransformTests::TestDate() { + IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + DateValue(2017, 11, 16).ToDateVal(), 5000); + EXPECT_EQ(3226, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + DateValue(2017, 11, 16).ToDateVal(), 150); + EXPECT_EQ(76, ret_val.val); + + // Test for values pre 1970. + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + DateValue(1901, 12, 5).ToDateVal(), 30); + EXPECT_EQ(13, ret_val.val); + + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, + DateValue(50, 3, 11).ToDateVal(), 1900); + EXPECT_EQ(1539, ret_val.val); + + // Check for null input. + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, DateVal::null(), 1); + EXPECT_TRUE(ret_val.is_null); + + TestIncorrectWidthParameter(DateVal(0)); +} + +void IcebergBucketPartitionTransformTests::TestTimestamp() { + TimestampVal tv; + TimestampValue::ParseSimpleDateFormat("2017-11-16 22:31:08").ToTimestampVal(&tv); + IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 100); + EXPECT_EQ(7, ret_val.val); + + TimestampValue::ParseSimpleDateFormat("2017-11-16 22:31:08").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 1150); + EXPECT_EQ(957, ret_val.val); + + // Check that zero fractional seconts doesn't change the output. + TimestampValue::ParseSimpleDateFormat( + "2017-11-16 22:31:08.000000").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 100); + EXPECT_EQ(7, ret_val.val); + + TimestampValue::ParseSimpleDateFormat( + "2017-11-16 22:31:08.000000000").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 100); + EXPECT_EQ(7, ret_val.val); + + // Checks for non-zero fractional seconds. + TimestampValue::ParseSimpleDateFormat("2010-10-09 12:39:28.123").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 250); + EXPECT_EQ(107, ret_val.val); + + TimestampValue::ParseSimpleDateFormat( + "2010-10-09 12:39:28.123456").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 250); + EXPECT_EQ(115, ret_val.val); + + // Check that the 7th, 8th, 9th digit if the fractional second doesn't have effect on + // the result. This is to follow Iceberg's behaviour. + TimestampValue::ParseSimpleDateFormat( + "2010-10-09 12:39:28.123456789").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 250); + EXPECT_EQ(115, ret_val.val); + + // Test for values pre 1970. + TimestampValue::ParseSimpleDateFormat( + "1905-01-02 12:20:25").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 2000); + EXPECT_EQ(809, ret_val.val); + + TimestampValue::ParseSimpleDateFormat( + "1905-01-02 12:20:25.123456").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 2000); + EXPECT_EQ(932, ret_val.val); + + TimestampValue::ParseSimpleDateFormat( + "1905-01-02 12:20:25.123456789").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 2000); + EXPECT_EQ(932, ret_val.val); + + TimestampValue::ParseSimpleDateFormat( + "1905-01-02 12:20:25").ToTimestampVal(&tv); + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 8000); + EXPECT_EQ(2809, ret_val.val); + + // Check for null input. + ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, TimestampVal::null(), 1); + EXPECT_TRUE(ret_val.is_null); + + TestIncorrectWidthParameter(TimestampVal(0)); +} + +template +void IcebergBucketPartitionTransformTests::TestIncorrectWidthParameter(const T& input) { + // Check for error when width is zero. + FunctionContext* ctx = CreateFunctionContext(); + IntVal ret_val = IcebergFunctions::BucketPartitionTransform(ctx, input, 0); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0); + ctx->impl()->Close(); + + // Check for error when width is negative. + ctx = CreateFunctionContext(); + ret_val = IcebergFunctions::BucketPartitionTransform(ctx, input, -1); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0); + ctx->impl()->Close(); + + // Check for error when width is null. + ctx = CreateFunctionContext(); + ret_val = IcebergFunctions::BucketPartitionTransform(ctx, input, IntVal::null()); + EXPECT_TRUE(ret_val.is_null); + EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0); + ctx->impl()->Close(); +} + + +TEST(TestIcebergFunctions, TruncateTransform) { + IcebergTruncatePartitionTransformTests::TestIntegerNumbers(); + IcebergTruncatePartitionTransformTests::TestString(); + IcebergTruncatePartitionTransformTests::TestDecimal(); +} + +TEST(TestIcebergFunctions, BucketTransform) { + IcebergBucketPartitionTransformTests::TestIntegerNumbers(); + IcebergBucketPartitionTransformTests::TestString(); + IcebergBucketPartitionTransformTests::TestDecimal(); + IcebergBucketPartitionTransformTests::TestDate(); + IcebergBucketPartitionTransformTests::TestTimestamp(); +} + +} diff --git a/be/src/exprs/iceberg-functions.h b/be/src/exprs/iceberg-functions.h new file mode 100644 index 000000000..01aa0e640 --- /dev/null +++ b/be/src/exprs/iceberg-functions.h @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "udf/udf.h" + +#include +#include + +namespace impala { + +using impala_udf::BigIntVal; +using impala_udf::DateVal; +using impala_udf::DecimalVal; +using impala_udf::FunctionContext; +using impala_udf::IntVal; +using impala_udf::StringVal; +using impala_udf::TimestampVal; + +/// This class holds functions that are related to Iceberg functionality. +/// E.g. implementations of partition transforms. +class IcebergFunctions { +public: + /// The following functions implement the truncate partition transform that + /// partitioned Iceberg tables use. + static IntVal TruncatePartitionTransform(FunctionContext* ctx, + const IntVal& input, const IntVal& width); + static BigIntVal TruncatePartitionTransform(FunctionContext* ctx, + const BigIntVal& input, const BigIntVal& width); + static DecimalVal TruncatePartitionTransform(FunctionContext* ctx, + const DecimalVal& input, const IntVal& width); + static DecimalVal TruncatePartitionTransform(FunctionContext* ctx, + const DecimalVal& input, const BigIntVal& width); + static StringVal TruncatePartitionTransform(FunctionContext* ctx, + const StringVal& input, const IntVal& width); + + /// The following functions implement the bucket partition transform that + /// partitioned Iceberg tables use. + static IntVal BucketPartitionTransform(FunctionContext* ctx, + const IntVal& input, const IntVal& width); + static IntVal BucketPartitionTransform(FunctionContext* ctx, + const BigIntVal& input, const IntVal& width); + static IntVal BucketPartitionTransform(FunctionContext* ctx, + const DecimalVal& input, const IntVal& width); + static IntVal BucketPartitionTransform(FunctionContext* ctx, + const StringVal& input, const IntVal& width); + static IntVal BucketPartitionTransform(FunctionContext* ctx, + const DateVal& input, const IntVal& width); + static IntVal BucketPartitionTransform(FunctionContext* ctx, + const TimestampVal& input, const IntVal& width); +private: + friend class IcebergTruncatePartitionTransformTests; + friend class IcebergBucketPartitionTransformTests; + + /// Error message to raise when width parameter is not positive. + static const std::string INCORRECT_WIDTH_ERROR_MSG; + + /// Error message to raise when truncate overflows for an int input. + static const std::string TRUNCATE_OVERFLOW_ERROR_MSG; + + /// The seed used for bucket transform. + static const unsigned BUCKET_TRANSFORM_SEED; + + /// Checks 'input' for null, 'width' for null and negative values. Returns false if any + /// of the checks fail, true otherwise. If any of the width checks fail ths also sets + /// an error message in 'ctx'. + template + static bool CheckInputsAndSetError(FunctionContext* ctx, const T& input, + const W& width); + + template + static T TruncatePartitionTransformNumericImpl(FunctionContext* ctx, const T& input, + const W& width); + + /// Helper function for TruncatePartitionTransform for Decimals where the size of the + /// decimal representation is given as a parameter. + static DecimalVal TruncateDecimal(FunctionContext* ctx, const DecimalVal& input, + const BigIntVal& width, int decimal_size); + + template + static DecimalVal TruncatePartitionTransformDecimalImpl(const T& decimal_val, + int64_t width); + + template + static IntVal BucketPartitionTransformNumericImpl(FunctionContext* ctx, const T& input, + const W& width); + + /// Helper function for BucketPartitionTransform for Decimals where the size of the + /// decimal representation is given as a parameter. + static IntVal BucketDecimal(FunctionContext* ctx, const DecimalVal& input, + const IntVal& width, int decimal_size); +}; + +} diff --git a/be/src/exprs/scalar-expr-evaluator.cc b/be/src/exprs/scalar-expr-evaluator.cc index e08d518d1..1f667da23 100644 --- a/be/src/exprs/scalar-expr-evaluator.cc +++ b/be/src/exprs/scalar-expr-evaluator.cc @@ -28,11 +28,12 @@ #include "exprs/cast-functions.h" #include "exprs/compound-predicates.h" #include "exprs/conditional-functions.h" -#include "datasketches-functions.h" +#include "exprs/datasketches-functions.h" #include "exprs/date-functions.h" #include "exprs/decimal-functions.h" #include "exprs/decimal-operators.h" #include "exprs/hive-udf-call.h" +#include "exprs/iceberg-functions.h" #include "exprs/in-predicate.h" #include "exprs/is-not-empty-predicate.h" #include "exprs/is-null-predicate.h" @@ -435,6 +436,7 @@ void ScalarExprEvaluator::InitBuiltinsDummy() { DataSketchesFunctions::DsHllEstimate(nullptr, StringVal::null()); DecimalFunctions::Precision(nullptr, DecimalVal::null()); DecimalOperators::CastToDecimalVal(nullptr, DecimalVal::null()); + IcebergFunctions::TruncatePartitionTransform(nullptr, IntVal::null(), IntVal::null()); InPredicate::InIterate(nullptr, BigIntVal::null(), 0, nullptr); IsNullPredicate::IsNull(nullptr, BooleanVal::null()); LikePredicate::Like(nullptr, StringVal::null(), StringVal::null()); diff --git a/be/src/thirdparty/murmurhash/MurmurHash3.cpp b/be/src/thirdparty/murmurhash/MurmurHash3.cpp new file mode 100644 index 000000000..aa7982d3e --- /dev/null +++ b/be/src/thirdparty/murmurhash/MurmurHash3.cpp @@ -0,0 +1,335 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +// Note - The x86 and x64 versions do _not_ produce the same results, as the +// algorithms are optimized for their respective platforms. You can still +// compile and run any of them on any platform, but your performance with the +// non-native version will be less than optimal. + +#include "MurmurHash3.h" + +//----------------------------------------------------------------------------- +// Platform-specific functions and macros + +// Microsoft Visual Studio + +#if defined(_MSC_VER) + +#define FORCE_INLINE __forceinline + +#include + +#define ROTL32(x,y) _rotl(x,y) +#define ROTL64(x,y) _rotl64(x,y) + +#define BIG_CONSTANT(x) (x) + +// Other compilers + +#else // defined(_MSC_VER) + +#define FORCE_INLINE inline __attribute__((always_inline)) + +inline uint32_t rotl32 ( uint32_t x, int8_t r ) +{ + return (x << r) | (x >> (32 - r)); +} + +inline uint64_t rotl64 ( uint64_t x, int8_t r ) +{ + return (x << r) | (x >> (64 - r)); +} + +#define ROTL32(x,y) rotl32(x,y) +#define ROTL64(x,y) rotl64(x,y) + +#define BIG_CONSTANT(x) (x##LLU) + +#endif // !defined(_MSC_VER) + +//----------------------------------------------------------------------------- +// Block read - if your platform needs to do endian-swapping or can only +// handle aligned reads, do the conversion here + +FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i ) +{ + return p[i]; +} + +FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i ) +{ + return p[i]; +} + +//----------------------------------------------------------------------------- +// Finalization mix - force all bits of a hash block to avalanche + +FORCE_INLINE uint32_t fmix32 ( uint32_t h ) +{ + h ^= h >> 16; + h *= 0x85ebca6b; + h ^= h >> 13; + h *= 0xc2b2ae35; + h ^= h >> 16; + + return h; +} + +//---------- + +FORCE_INLINE uint64_t fmix64 ( uint64_t k ) +{ + k ^= k >> 33; + k *= BIG_CONSTANT(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + + return k; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_32 ( const void * key, int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 4; + + uint32_t h1 = seed; + + const uint32_t c1 = 0xcc9e2d51; + const uint32_t c2 = 0x1b873593; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*4); + + for(int i = -nblocks; i; i++) + { + uint32_t k1 = getblock32(blocks,i); + + k1 *= c1; + k1 = ROTL32(k1,15); + k1 *= c2; + + h1 ^= k1; + h1 = ROTL32(h1,13); + h1 = h1*5+0xe6546b64; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*4); + + uint32_t k1 = 0; + + switch(len & 3) + { + case 3: k1 ^= tail[2] << 16; + case 2: k1 ^= tail[1] << 8; + case 1: k1 ^= tail[0]; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; + + h1 = fmix32(h1); + + *(uint32_t*)out = h1; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_128 ( const void * key, const int len, + uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + + uint32_t h1 = seed; + uint32_t h2 = seed; + uint32_t h3 = seed; + uint32_t h4 = seed; + + const uint32_t c1 = 0x239b961b; + const uint32_t c2 = 0xab0e9789; + const uint32_t c3 = 0x38b34ae5; + const uint32_t c4 = 0xa1e38b93; + + //---------- + // body + + const uint32_t * blocks = (const uint32_t *)(data + nblocks*16); + + for(int i = -nblocks; i; i++) + { + uint32_t k1 = getblock32(blocks,i*4+0); + uint32_t k2 = getblock32(blocks,i*4+1); + uint32_t k3 = getblock32(blocks,i*4+2); + uint32_t k4 = getblock32(blocks,i*4+3); + + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + + h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b; + + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747; + + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35; + + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint32_t k1 = 0; + uint32_t k2 = 0; + uint32_t k3 = 0; + uint32_t k4 = 0; + + switch(len & 15) + { + case 15: k4 ^= tail[14] << 16; + case 14: k4 ^= tail[13] << 8; + case 13: k4 ^= tail[12] << 0; + k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4; + + case 12: k3 ^= tail[11] << 24; + case 11: k3 ^= tail[10] << 16; + case 10: k3 ^= tail[ 9] << 8; + case 9: k3 ^= tail[ 8] << 0; + k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3; + + case 8: k2 ^= tail[ 7] << 24; + case 7: k2 ^= tail[ 6] << 16; + case 6: k2 ^= tail[ 5] << 8; + case 5: k2 ^= tail[ 4] << 0; + k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2; + + case 4: k1 ^= tail[ 3] << 24; + case 3: k1 ^= tail[ 2] << 16; + case 2: k1 ^= tail[ 1] << 8; + case 1: k1 ^= tail[ 0] << 0; + k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len; + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + h1 = fmix32(h1); + h2 = fmix32(h2); + h3 = fmix32(h3); + h4 = fmix32(h4); + + h1 += h2; h1 += h3; h1 += h4; + h2 += h1; h3 += h1; h4 += h1; + + ((uint32_t*)out)[0] = h1; + ((uint32_t*)out)[1] = h2; + ((uint32_t*)out)[2] = h3; + ((uint32_t*)out)[3] = h4; +} + +//----------------------------------------------------------------------------- + +void MurmurHash3_x64_128 ( const void * key, const int len, + const uint32_t seed, void * out ) +{ + const uint8_t * data = (const uint8_t*)key; + const int nblocks = len / 16; + + uint64_t h1 = seed; + uint64_t h2 = seed; + + const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5); + const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f); + + //---------- + // body + + const uint64_t * blocks = (const uint64_t *)(data); + + for(int i = 0; i < nblocks; i++) + { + uint64_t k1 = getblock64(blocks,i*2+0); + uint64_t k2 = getblock64(blocks,i*2+1); + + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + + h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729; + + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5; + } + + //---------- + // tail + + const uint8_t * tail = (const uint8_t*)(data + nblocks*16); + + uint64_t k1 = 0; + uint64_t k2 = 0; + + switch(len & 15) + { + case 15: k2 ^= ((uint64_t)tail[14]) << 48; + case 14: k2 ^= ((uint64_t)tail[13]) << 40; + case 13: k2 ^= ((uint64_t)tail[12]) << 32; + case 12: k2 ^= ((uint64_t)tail[11]) << 24; + case 11: k2 ^= ((uint64_t)tail[10]) << 16; + case 10: k2 ^= ((uint64_t)tail[ 9]) << 8; + case 9: k2 ^= ((uint64_t)tail[ 8]) << 0; + k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2; + + case 8: k1 ^= ((uint64_t)tail[ 7]) << 56; + case 7: k1 ^= ((uint64_t)tail[ 6]) << 48; + case 6: k1 ^= ((uint64_t)tail[ 5]) << 40; + case 5: k1 ^= ((uint64_t)tail[ 4]) << 32; + case 4: k1 ^= ((uint64_t)tail[ 3]) << 24; + case 3: k1 ^= ((uint64_t)tail[ 2]) << 16; + case 2: k1 ^= ((uint64_t)tail[ 1]) << 8; + case 1: k1 ^= ((uint64_t)tail[ 0]) << 0; + k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1; + }; + + //---------- + // finalization + + h1 ^= len; h2 ^= len; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + ((uint64_t*)out)[0] = h1; + ((uint64_t*)out)[1] = h2; +} + +//----------------------------------------------------------------------------- + diff --git a/be/src/thirdparty/murmurhash/MurmurHash3.h b/be/src/thirdparty/murmurhash/MurmurHash3.h new file mode 100644 index 000000000..20790b9fd --- /dev/null +++ b/be/src/thirdparty/murmurhash/MurmurHash3.h @@ -0,0 +1,40 @@ +//----------------------------------------------------------------------------- +// MurmurHash3 was written by Austin Appleby, and is placed in the public +// domain. The author hereby disclaims copyright to this source code. + +// The name of the include guard had to be changed to avoid collision with Datasketches +// version of murmurhash algorithms in be/src/thirdparty/datasketches/MurmurHash3.h + +#ifndef _APPLEBY_MURMURHASH3_H_ +#define _APPLEBY_MURMURHASH3_H_ + +//----------------------------------------------------------------------------- +// Platform-specific functions and macros + +// Microsoft Visual Studio + +#if defined(_MSC_VER) && (_MSC_VER < 1600) + +typedef unsigned char uint8_t; +typedef unsigned int uint32_t; +typedef unsigned __int64 uint64_t; + +// Other compilers + +#else // defined(_MSC_VER) + +#include + +#endif // !defined(_MSC_VER) + +//----------------------------------------------------------------------------- + +void MurmurHash3_x86_32 ( const void * key, int len, uint32_t seed, void * out ); + +void MurmurHash3_x86_128 ( const void * key, int len, uint32_t seed, void * out ); + +void MurmurHash3_x64_128 ( const void * key, int len, uint32_t seed, void * out ); + +//----------------------------------------------------------------------------- + +#endif // _MURMURHASH3_H_ diff --git a/be/src/thirdparty/murmurhash/README.md b/be/src/thirdparty/murmurhash/README.md new file mode 100644 index 000000000..856e83ab2 --- /dev/null +++ b/be/src/thirdparty/murmurhash/README.md @@ -0,0 +1,9 @@ +This library contains implementations for different variations of murmurhash. This is +basically a copy-paste from an external source: https://github.com/aappleby/smhasher. + +I used the following commit for our snapshot in Impala: +https://github.com/aappleby/smhasher/tree/92cf3702fcfaadc84eb7bef59825a23e0cd84f56/ + +Note, the name of the include guard had to be changed to avoid collision with +Datasketches version of murmurhash algorithms in +be/src/thirdparty/datasketches/MurmurHash3.h diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc index 9f974cad4..f602603ba 100644 --- a/be/src/util/bit-util-test.cc +++ b/be/src/util/bit-util-test.cc @@ -338,5 +338,75 @@ TEST(BitUtil, CpuInfoIsSupportedHoist) { } #endif +template +void RunIntToByteArrayTest(const T& input, const std::vector& expected) { + std::string result = BitUtil::IntToByteBuffer(input); + EXPECT_EQ(expected.size(), result.size()) << input; + for (int i = 0; i < std::min(expected.size(), result.size()); ++i) { + EXPECT_EQ(expected[i], result[i]) << input; + } +} + +// __int128_t has no support for << operator. This is a specialized function for this +// type where in case of an EXPECT check failing we don't print the original input. +template<> +void RunIntToByteArrayTest(const __int128_t& input, const std::vector& expected) { + std::string result = BitUtil::IntToByteBuffer(input); + EXPECT_EQ(expected.size(), result.size()); + for (int i = 0; i < std::min(expected.size(), result.size()); ++i) { + EXPECT_EQ(expected[i], result[i]); + } +} + +// The expected results come from running Java's BigInteger.toByteArray(). +TEST(BitUtil, IntToByteArray) { + // Test int32 inputs. + RunIntToByteArrayTest(0, {0}); + RunIntToByteArrayTest(127, {127}); + RunIntToByteArrayTest(128, {0, -128}); + RunIntToByteArrayTest(255, {0, -1}); + RunIntToByteArrayTest(256, {1, 0}); + RunIntToByteArrayTest(65500, {0, -1, -36}); + RunIntToByteArrayTest(123456123, {7, 91, -54, 123}); + RunIntToByteArrayTest(2147483647, {127, -1, -1, -1}); + RunIntToByteArrayTest(-1, {-1}); + RunIntToByteArrayTest(-128, {-128}); + RunIntToByteArrayTest(-129, {-1, 127}); + RunIntToByteArrayTest(-1024, {-4, 0}); + RunIntToByteArrayTest(-1025, {-5, -1}); + RunIntToByteArrayTest(-40000, {-1, 99, -64}); + RunIntToByteArrayTest(-68000, {-2, -10, 96}); + RunIntToByteArrayTest(-654321321, {-40, -1, -39, 87}); + RunIntToByteArrayTest(-2147483647, {-128, 0, 0, 1}); + RunIntToByteArrayTest(-2147483648, {-128, 0, 0, 0}); + + // Test int64 inputs. + RunIntToByteArrayTest(2147483648, {0, -128, 0, 0, 0}); + RunIntToByteArrayTest(2147489999, {0, -128, 0, 24, -49}); + RunIntToByteArrayTest(123498764226421, {112, 82, 75, -8, -49, 117}); + RunIntToByteArrayTest(45935528764226421, {0, -93, 50, 30, -70, -113, -69, 117}); + RunIntToByteArrayTest(9223372036854775807, {127, -1, -1, -1, -1, -1, -1, -1}); + RunIntToByteArrayTest(-2147483649, {-1, 127, -1, -1, -1}); + RunIntToByteArrayTest(-2147483650, {-1, 127, -1, -1, -2}); + RunIntToByteArrayTest(-226103951038195, {-1, 50, 92, 18, 80, -23, 13}); + RunIntToByteArrayTest(-18237603371852591, {-65, 52, -1, 17, 119, 92, -47}); + RunIntToByteArrayTest(-48227503771052199, {-1, 84, -87, 87, 65, 82, -105, 89}); + RunIntToByteArrayTest(-9223372036854775807, {-128, 0, 0, 0, 0, 0, 0, 1}); + // C++ compiler doesn't accept -9223372036854775808 (int64_t min) as a valid constant. + RunIntToByteArrayTest(-9223372036854775807 - 1, {-128, 0, 0, 0, 0, 0, 0, 0}); + + // Test int128 inputs. + RunIntToByteArrayTest(__int128_t(9223372036854775807) + 1, + {0, -128, 0, 0, 0, 0, 0, 0, 0}); + RunIntToByteArrayTest(__int128_t(9223372036854775807) + 2, + {0, -128, 0, 0, 0, 0, 0, 0, 1}); + RunIntToByteArrayTest(__int128_t(123321456654789987) * 1000, + {6, -81, 109, -45, 113, 68, 125, 74, -72}); + RunIntToByteArrayTest(__int128_t(7255211462147863907) * 120000, + {0, -72, 92, -78, 56, 127, -30, 94, 113, 6, 64}); + RunIntToByteArrayTest(__int128_t(7255211462147863907) * 180000, + {1, 20, -117, 11, 84, -65, -45, -115, -87, -119, 96}); +} + } diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h index ffdbccc82..2ab783b99 100644 --- a/be/src/util/bit-util.h +++ b/be/src/util/bit-util.h @@ -23,9 +23,11 @@ #include #endif +#include #include #include #include +#include #include "common/compiler-util.h" #include "common/logging.h" @@ -341,6 +343,33 @@ class BitUtil { return floor + 1; } } + + /// The purpose of this function is to replicate Java's BigInteger.toByteArray() + /// function. Receives an int input (it can be any size) and returns a buffer that + /// represents the byte sequence representation of this number where the most + /// significant byte is in the zeroth element. + /// Note, the return value is stored in a string instead of in a vector of chars for + /// potential optimisations with small strings. + /// E.g. 520 = [2, 8] + /// E.g. 12065530 = [0, -72, 26, -6] + /// E.g. -129 = [-1, 127] + template + static std::string IntToByteBuffer(T input) { + std::string buffer; + T value = input; + for (int i = 0; i < sizeof(value); ++i) { + // Applies a mask for a byte range on the input. + char value_to_save = value & 0XFF; + buffer.push_back(value_to_save); + // Remove the just processed part from the input so that we can exit early if there + // is nothing left to process. + value >>= 8; + if (value == 0 && value_to_save >= 0) break; + if (value == -1 && value_to_save < 0) break; + } + std::reverse(buffer.begin(), buffer.end()); + return buffer; + } }; /// An encapsulation class of SIMD byteswap functions diff --git a/bin/rat_exclude_files.txt b/bin/rat_exclude_files.txt index 26ec90db4..96a1bc333 100644 --- a/bin/rat_exclude_files.txt +++ b/bin/rat_exclude_files.txt @@ -30,6 +30,7 @@ testdata/cluster/node_templates/cdh7/etc/init.d/kms # See $IMPALA_HOME/LICENSE.txt be/src/gutil/* be/src/thirdparty/datasketches/* +be/src/thirdparty/murmurhash/* be/src/thirdparty/mpfit/* be/src/kudu/gutil www/highlight/* diff --git a/bin/run_clang_tidy.sh b/bin/run_clang_tidy.sh index f4264f0bd..2d6ff8175 100755 --- a/bin/run_clang_tidy.sh +++ b/bin/run_clang_tidy.sh @@ -42,7 +42,7 @@ DIRS=$(ls -d "${IMPALA_HOME}/be/src/"*/ | grep -v gutil | grep -v kudu |\ grep -v thirdparty | tr '\n' ' ') # Include/exclude select thirdparty dirs. DIRS=$DIRS$(ls -d "${IMPALA_HOME}/be/src/thirdparty/"*/ | grep -v mpfit |\ - grep -v datasketches | tr '\n' ' ') + grep -v datasketches | grep -v murmurhash | tr '\n' ' ') PIPE_DIRS=$(echo "${DIRS}" | tr ' ' '|') # Reduce the concurrency to one less than the number of cores in the system. Note than diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py index 6a484bf7d..a57685413 100644 --- a/common/function-registry/impala_functions.py +++ b/common/function-registry/impala_functions.py @@ -1001,4 +1001,29 @@ invisible_functions = [ [['notdistinct'], 'BOOLEAN', ['CHAR', 'CHAR'], 'impala::Operators::NotDistinct_Char_Char'], [['notdistinct'], 'BOOLEAN', ['DECIMAL', 'DECIMAL'], 'impala::DecimalOperators::NotDistinct_DecimalVal_DecimalVal'], [['notdistinct'], 'BOOLEAN', ['DATE', 'DATE'], 'impala::Operators::NotDistinct_DateVal_DateVal'], + + # Functions related to Apache Iceberg functionality. + [['iceberg_truncate_transform'], 'INT', ['INT', 'INT'], + '_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_6IntValES6_'], + [['iceberg_truncate_transform'], 'BIGINT', ['BIGINT', 'BIGINT'], + '_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_9BigIntValES6_'], + [['iceberg_truncate_transform'], 'DECIMAL', ['DECIMAL', 'INT'], + '_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_10DecimalValERKNS1_6IntValE'], + [['iceberg_truncate_transform'], 'DECIMAL', ['DECIMAL', 'BIGINT'], + '_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_10DecimalValERKNS1_9BigIntValE'], + [['iceberg_truncate_transform'], 'STRING', ['STRING', 'INT'], + '_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_6IntValE'], + + [['iceberg_bucket_transform'], 'INT', ['INT', 'INT'], + '_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_6IntValES6_'], + [['iceberg_bucket_transform'], 'INT', ['BIGINT', 'INT'], + '_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_9BigIntValERKNS1_6IntValE'], + [['iceberg_bucket_transform'], 'INT', ['DECIMAL', 'INT'], + '_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_10DecimalValERKNS1_6IntValE'], + [['iceberg_bucket_transform'], 'INT', ['STRING', 'INT'], + '_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_6IntValE'], + [['iceberg_bucket_transform'], 'INT', ['DATE', 'INT'], + '_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_7DateValERKNS1_6IntValE'], + [['iceberg_bucket_transform'], 'INT', ['TIMESTAMP', 'INT'], + '_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_12TimestampValERKNS1_6IntValE'], ]