IMPALA-10237: Support Bucket and Truncate partition transforms as built-in functions

This patch implements Truncate and Bucket partition transforms in
Impala BE as built-in functions. The expectation is that these
functions give the same result as Iceberg's implementation of the same
functions. These built-in functions are invisible so users won't be
able to invoke them e.g. from impala-shell.

Truncate:
  - Supported types are IntVal, BigIntVal, StringVal, DecimalVal.
  - Receives an input from the above types and a width.
  - Returns the same type as the input.
  - Expected behaviour is explained here:
    https://iceberg.apache.org/spec/#truncate-transform-details
Bucket:
  - Supported types are IntVal, BigIntVal, StringVal, DecimalVal,
    DateVal, TimestampVal.
  - Receives an input from the above types and the number of buckets as
    IntVal.
  - Returns IntVal.
  - Expected behaviour is explained here:
    https://iceberg.apache.org/spec/#bucket-transform-details

Change-Id: I485680cf79d96d578dd8cfbfd554bec468fe84bd
Reviewed-on: http://gerrit.cloudera.org:8080/16741
Reviewed-by: Gabor Kaszab <gaborkaszab@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Gabor Kaszab
2020-11-18 15:29:20 +01:00
committed by Impala Public Jenkins
parent 296ed74d6f
commit 6cb7cecacf
14 changed files with 1438 additions and 2 deletions

View File

@@ -49,6 +49,7 @@
#include "exprs/decimal-functions-ir.cc" #include "exprs/decimal-functions-ir.cc"
#include "exprs/decimal-operators-ir.cc" #include "exprs/decimal-operators-ir.cc"
#include "exprs/hive-udf-call-ir.cc" #include "exprs/hive-udf-call-ir.cc"
#include "exprs/iceberg-functions-ir.cc"
#include "exprs/in-predicate-ir.cc" #include "exprs/in-predicate-ir.cc"
#include "exprs/is-null-predicate-ir.cc" #include "exprs/is-null-predicate-ir.cc"
#include "exprs/kudu-partition-expr-ir.cc" #include "exprs/kudu-partition-expr-ir.cc"

View File

@@ -22,6 +22,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs")
# where to put generated binaries # where to put generated binaries
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exprs")
set(MURMURHASH_SRC_DIR "${CMAKE_SOURCE_DIR}/be/src/thirdparty/murmurhash")
add_library(Exprs add_library(Exprs
agg-fn.cc agg-fn.cc
agg-fn-evaluator.cc agg-fn-evaluator.cc
@@ -44,6 +46,7 @@ add_library(Exprs
expr.cc expr.cc
hive-udf-call.cc hive-udf-call.cc
hive-udf-call-ir.cc hive-udf-call-ir.cc
iceberg-functions-ir.cc
in-predicate-ir.cc in-predicate-ir.cc
is-not-empty-predicate.cc is-not-empty-predicate.cc
is-null-predicate-ir.cc is-null-predicate-ir.cc
@@ -54,6 +57,7 @@ add_library(Exprs
literal.cc literal.cc
mask-functions-ir.cc mask-functions-ir.cc
math-functions-ir.cc math-functions-ir.cc
${MURMURHASH_SRC_DIR}/MurmurHash3.cpp
null-literal.cc null-literal.cc
operators-ir.cc operators-ir.cc
scalar-expr.cc scalar-expr.cc
@@ -79,12 +83,14 @@ add_dependencies(Exprs gen-deps gen_ir_descriptions)
add_library(ExprsTests STATIC add_library(ExprsTests STATIC
datasketches-test.cc datasketches-test.cc
expr-test.cc expr-test.cc
iceberg-functions-test.cc
timezone_db-test.cc timezone_db-test.cc
) )
add_dependencies(ExprsTests gen-deps) add_dependencies(ExprsTests gen-deps)
ADD_UNIFIED_BE_LSAN_TEST(datasketches-test ADD_UNIFIED_BE_LSAN_TEST(datasketches-test
"TestDataSketchesHll.*:TestDataSketchesKll.*:TestDataSketchesCpc.*") "TestDataSketchesHll.*:TestDataSketchesKll.*:TestDataSketchesCpc.*")
ADD_UNIFIED_BE_LSAN_TEST(iceberg-functions-test "TestIcebergFunctions.*")
ADD_UNIFIED_BE_LSAN_TEST(expr-test "Instantiations/ExprTest.*") ADD_UNIFIED_BE_LSAN_TEST(expr-test "Instantiations/ExprTest.*")
# Exception to unified be tests: custom main initiailizes LLVM # Exception to unified be tests: custom main initiailizes LLVM
ADD_BE_LSAN_TEST(expr-codegen-test) ADD_BE_LSAN_TEST(expr-codegen-test)

View File

@@ -0,0 +1,216 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exprs/iceberg-functions.h"
#include <limits>
#include <vector>
#include "common/compiler-util.h"
#include "common/logging.h"
#include "runtime/timestamp-value.inline.h"
#include "thirdparty/murmurhash/MurmurHash3.h"
#include "udf/udf-internal.h"
#include "util/bit-util.h"
namespace impala {
using std::numeric_limits;
const std::string IcebergFunctions::INCORRECT_WIDTH_ERROR_MSG =
"Width parameter should be greater than zero.";
const std::string IcebergFunctions::TRUNCATE_OVERFLOW_ERROR_MSG =
"Truncate operation overflows for the given input.";
const unsigned IcebergFunctions::BUCKET_TRANSFORM_SEED = 0;
IntVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx,
const IntVal& input, const IntVal& width) {
return TruncatePartitionTransformNumericImpl(ctx, input, width);
}
BigIntVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx,
const BigIntVal& input, const BigIntVal& width) {
return TruncatePartitionTransformNumericImpl(ctx, input, width);
}
DecimalVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx,
const DecimalVal& input, const IntVal& width) {
return TruncatePartitionTransform(ctx, input, BigIntVal(width.val));
}
DecimalVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx,
const DecimalVal& input, const BigIntVal& width) {
if (!CheckInputsAndSetError(ctx, input, width)) return DecimalVal::null();
int decimal_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0);
return TruncateDecimal(ctx, input, width, decimal_size);
}
DecimalVal IcebergFunctions::TruncateDecimal(FunctionContext* ctx,
const DecimalVal& input, const BigIntVal& width, int decimal_size) {
switch (decimal_size) {
case 4: {
// If the input is negative and within int32 range while the width is bigger than
// int32 max value than the expected would be to return the width. However, it
// won't fit into an int32 so an overflow error is returned instead.
if (UNLIKELY(input.val4 < 0 && width.val > std::numeric_limits<int32_t>::max())) {
ctx->SetError(TRUNCATE_OVERFLOW_ERROR_MSG.c_str());
return DecimalVal::null();
}
return TruncatePartitionTransformDecimalImpl<int32_t>(input.val4, width.val);
}
case 8: {
return TruncatePartitionTransformDecimalImpl<int64_t>(input.val8, width.val);
}
case 16: {
DecimalVal result = TruncatePartitionTransformDecimalImpl(input.val16, width.val);
if (input.val16 < 0 && result.val16 > 0) {
ctx->SetError(TRUNCATE_OVERFLOW_ERROR_MSG.c_str());
return DecimalVal::null();
}
return result;
}
default:
return DecimalVal::null();
}
}
template<typename T>
DecimalVal IcebergFunctions::TruncatePartitionTransformDecimalImpl(const T& decimal_val,
int64_t width) {
DCHECK(width > 0);
if (decimal_val > 0) return decimal_val - (decimal_val % width);
return decimal_val - (((decimal_val % width) + width) % width);
}
StringVal IcebergFunctions::TruncatePartitionTransform(FunctionContext* ctx,
const StringVal& input, const IntVal& width) {
if (!CheckInputsAndSetError(ctx, input, width)) return StringVal::null();
if (input.len <= width.val) return input;
return StringVal::CopyFrom(ctx, input.ptr, width.val);
}
template<typename T, typename W>
T IcebergFunctions::TruncatePartitionTransformNumericImpl(FunctionContext* ctx,
const T& input, const W& width) {
if (!CheckInputsAndSetError(ctx, input, width)) return T::null();
if (input.val >= 0) return input.val - (input.val % width.val);
T result = input.val - (((input.val % width.val) + width.val) % width.val);
if (UNLIKELY(result.val > 0)) {
ctx->SetError(TRUNCATE_OVERFLOW_ERROR_MSG.c_str());
return T::null();
}
return result;
}
IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx,
const IntVal& input, const IntVal& width) {
return BucketPartitionTransformNumericImpl(ctx, input, width);
}
IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx,
const BigIntVal& input, const IntVal& width) {
return BucketPartitionTransformNumericImpl(ctx, input, width);
}
IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx,
const DecimalVal& input, const IntVal& width) {
if (!CheckInputsAndSetError(ctx, input, width)) return IntVal::null();
int decimal_size = ctx->impl()->GetConstFnAttr(FunctionContextImpl::ARG_TYPE_SIZE, 0);
return BucketDecimal(ctx, input, width, decimal_size);
}
IntVal IcebergFunctions::BucketDecimal(FunctionContext* ctx,
const DecimalVal& input, const IntVal& width, int decimal_size) {
DCHECK(!input.is_null || !width.is_null);
// Iceberg converts the decimal input's unscaled value into a byte array and hash-es
// that byte array instead of the underlying integer representation. We do the same
// here to be compatible with Iceberg.
std::string buffer;
switch (decimal_size) {
case 4: {
buffer = BitUtil::IntToByteBuffer(input.val4);
break;
}
case 8: {
buffer = BitUtil::IntToByteBuffer(input.val8);
break;
}
case 16: {
buffer = BitUtil::IntToByteBuffer(input.val16);
break;
}
default: {
return IntVal::null();
}
}
int hash_result;
MurmurHash3_x86_32(buffer.data(), buffer.size(), BUCKET_TRANSFORM_SEED,
&hash_result);
return (hash_result & std::numeric_limits<int>::max()) % width.val;
}
IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx,
const StringVal& input, const IntVal& width) {
if (!CheckInputsAndSetError(ctx, input, width)) return IntVal::null();
int hash_result;
MurmurHash3_x86_32(input.ptr, input.len, BUCKET_TRANSFORM_SEED, &hash_result);
return (hash_result & std::numeric_limits<int>::max()) % width.val;
}
IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx,
const DateVal& input, const IntVal& width) {
if (input.is_null) return IntVal::null();
return BucketPartitionTransformNumericImpl(ctx, IntVal(input.val), width);
}
IntVal IcebergFunctions::BucketPartitionTransform(FunctionContext* ctx,
const TimestampVal& input, const IntVal& width) {
if (input.is_null) return IntVal::null();
TimestampValue tv = TimestampValue::FromTimestampVal(input);
// Iceberg stores timestamps in int64 so to be in line Impala's bucket partition
// transform also uses an int64 representation.
int64_t micros_since_epoch;
tv.FloorUtcToUnixTimeMicros(&micros_since_epoch);
return BucketPartitionTransformNumericImpl(ctx, BigIntVal(micros_since_epoch), width);
}
template<typename T, typename W>
IntVal IcebergFunctions::BucketPartitionTransformNumericImpl(FunctionContext* ctx,
const T& input, const W& width) {
if (!CheckInputsAndSetError(ctx, input, width)) return IntVal::null();
// Iceberg's int based bucket partition transform converts the int input to long before
// getting the hash value of it. We do the same to guarantee compatibility.
long key = (long)input.val;
int hash_result;
MurmurHash3_x86_32(&key, sizeof(long), BUCKET_TRANSFORM_SEED, &hash_result);
return (hash_result & std::numeric_limits<int>::max()) % width.val;
}
template<typename T, typename W>
bool IcebergFunctions::CheckInputsAndSetError(FunctionContext* ctx, const T& input,
const W& width) {
if (width.is_null || width.val <= 0) {
ctx->SetError(INCORRECT_WIDTH_ERROR_MSG.c_str());
return false;
}
if (input.is_null) return false;
return true;
}
}

View File

@@ -0,0 +1,593 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exprs/iceberg-functions.h"
#include "thirdparty/murmurhash/MurmurHash3.h"
#include <limits>
#include <string>
#include <vector>
#include "runtime/date-value.h"
#include "runtime/exec-env.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/runtime-state.h"
#include "runtime/timestamp-value.h"
#include "testutil/gtest-util.h"
#include "udf/udf-internal.h"
namespace impala {
// Create a FunctionContext for tests that doesn't allocate memory through a mem pool.
FunctionContext* CreateFunctionContext(
MemPool* pool) {
FunctionContext::TypeDesc return_type;
return_type.type = FunctionContext::Type::TYPE_INT;
FunctionContext::TypeDesc param_desc;
param_desc.type = FunctionContext::Type::TYPE_INT;
std::vector<FunctionContext::TypeDesc> arg_types(2, param_desc);
FunctionContext* ctx = FunctionContextImpl::CreateContext(
nullptr, pool, pool, return_type, arg_types, 0, true);
EXPECT_TRUE(ctx != nullptr);
return ctx;
}
// Create a FunctionContext where the used mem pool comes as a parameter. Can be used
// for tests that allocate memory through mem pool, e.g. truncate strings tests.
FunctionContext* CreateFunctionContext() {
MemTracker m;
MemPool pool(&m);
return CreateFunctionContext(&pool);
}
// numeric_limits seems to have a bug when used with __int128_t as it returns zero
// for both the min and the max value. This function gives the min value for 128 byte
// int.
__int128_t GetMinForInt128() {
__int128_t num = -1;
for (int i = 0; i < 126; ++i) {
num <<= 1;
num -= 1;
}
return num;
}
// All the expected values in the tests below are reflecting what Iceberg would return
// for the given inputs. I used Iceberg's org.apache.iceberg.transforms.TestBucketing
// unit tests to check what is the expected result for a particular input.
class IcebergTruncatePartitionTransformTests {
public:
static void TestIntegerNumbers();
static void TestString();
static void TestDecimal();
private:
template<typename T>
static void TestIntegerNumbersHelper();
template<typename T, typename W>
static void TestIncorrectWidthParameter(const T& input);
};
class IcebergBucketPartitionTransformTests {
public:
static void TestIntegerNumbers();
static void TestString();
static void TestDecimal();
static void TestDate();
static void TestTimestamp();
private:
template<typename T>
static void TestIntegerNumbersHelper();
template<typename T>
static void TestIncorrectWidthParameter(const T& input);
};
void IcebergTruncatePartitionTransformTests::TestIntegerNumbers() {
IcebergTruncatePartitionTransformTests::TestIntegerNumbersHelper<IntVal>();
IcebergTruncatePartitionTransformTests::TestIntegerNumbersHelper<BigIntVal>();
// Check that BigInt width is valid.
int64_t max_value64 = std::numeric_limits<int64_t>::max();
BigIntVal ret_val = IcebergFunctions::TruncatePartitionTransform(nullptr,
BigIntVal(max_value64), BigIntVal(max_value64));
EXPECT_EQ(max_value64, ret_val.val);
}
template<typename T>
void IcebergTruncatePartitionTransformTests::TestIntegerNumbersHelper() {
// Test positive inputs.
T ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, T(15), T(10));
EXPECT_EQ(10, ret_val.val);
ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, T(15), T(4));
EXPECT_EQ(12, ret_val.val);
// Test negative inputs.
ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, T(-1), T(10));
EXPECT_EQ(-10, ret_val.val);
ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, T(-15), T(4));
EXPECT_EQ(-16, ret_val.val);
ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, T(-15), T(1));
EXPECT_EQ(-15, ret_val.val);
// Test NULL input.
ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, T::null(), T(10));
EXPECT_TRUE(ret_val.is_null);
// Check for overflow when truncating from min value
FunctionContext* ctx = CreateFunctionContext();
typename T::underlying_type_t num_min =
std::numeric_limits<typename T::underlying_type_t>::min();
ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, T(num_min), T(10000));
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Truncate operation overflows for the given input.",
ctx->error_msg()), 0);
ctx->impl()->Close();
TestIncorrectWidthParameter<T, T>(T(0));
}
void IcebergTruncatePartitionTransformTests::TestString() {
MemTracker m;
MemPool pool(&m);
StringVal input("Some test input");
FunctionContext* ctx = CreateFunctionContext(&pool);
int width = 10;
StringVal ret_val = IcebergFunctions::TruncatePartitionTransform(
ctx, input, IntVal(width));
EXPECT_EQ(ret_val.len, width);
EXPECT_EQ(strncmp((char*)input.ptr, (char*)ret_val.ptr, width), 0);
// Truncate width is longer than the input string.
ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, 100);
EXPECT_EQ(ret_val.len, input.len);
EXPECT_EQ(strncmp((char*)input.ptr, (char*)ret_val.ptr, input.len), 0);
// Truncate width is the same as the the input string length.
ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, input.len);
EXPECT_EQ(ret_val.len, input.len);
EXPECT_EQ(strncmp((char*)input.ptr, (char*)ret_val.ptr, input.len), 0);
// Test NULL input.
ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, StringVal::null(), IntVal(10));
EXPECT_TRUE(ret_val.is_null);
// Test empty string input.
ctx = CreateFunctionContext(&pool);
ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, "", 10);
EXPECT_EQ(ret_val.len, 0);
TestIncorrectWidthParameter<StringVal, IntVal>(StringVal("input"));
pool.FreeAll();
}
void IcebergTruncatePartitionTransformTests::TestDecimal() {
// Testing decimal in unit tests by invoking TruncatePartitionTransform seems
// problematic as it queries ARG_TYPE_SIZE from FunctionContext. Apparently, it is not
// properly set in unit tests. Use TruncateDecimal instead that gets the size as
// parameter.
DecimalVal ret_val = IcebergFunctions::TruncateDecimal(nullptr, 10050, 100, 4);
EXPECT_EQ(10000, ret_val.val4);
ret_val = IcebergFunctions::TruncateDecimal(nullptr, 100, 8, 4);
EXPECT_EQ(96, ret_val.val4);
ret_val = IcebergFunctions::TruncateDecimal(nullptr, 0, 10, 4);
EXPECT_EQ(0, ret_val.val4);
ret_val = IcebergFunctions::TruncateDecimal(nullptr, -1, 10, 4);
EXPECT_EQ(-10, ret_val.val4);
ret_val = IcebergFunctions::TruncateDecimal(nullptr, 12345, 1, 4);
EXPECT_EQ(12345, ret_val.val4);
// Test BigInt width with positive int32 Decimal inputs.
ret_val = IcebergFunctions::TruncateDecimal(nullptr, 12345, 123456789012, 4);
EXPECT_EQ(0, ret_val.val4);
ret_val = IcebergFunctions::TruncateDecimal(nullptr, 12345, 100000000001, 4);
EXPECT_EQ(0, ret_val.val4);
// Test BigInt width with negative int32 Decimal inputs. In this case int32 is not big
// enough to store the result and an overflow error is expected.
FunctionContext* ctx = CreateFunctionContext();
ret_val = IcebergFunctions::TruncateDecimal(ctx, -12345, 123456789012, 4);
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Truncate operation overflows for the given input.",
ctx->error_msg()), 0);
ctx->impl()->Close();
ctx = CreateFunctionContext();
ret_val = IcebergFunctions::TruncateDecimal(ctx, -12345, 100000000001, 4);
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Truncate operation overflows for the given input.",
ctx->error_msg()), 0);
ctx->impl()->Close();
// Test the maximum limit for each representation size.
// Note, decimal type represent int32 max in 8 bytes, int64 max in 16 bytes.
int32_t int32_max_value = std::numeric_limits<int32_t>::max();
ret_val = IcebergFunctions::TruncateDecimal(nullptr, int32_max_value, 50, 8);
EXPECT_EQ(int32_max_value - (int32_max_value % 50), ret_val.val4);
int64_t int64_max_value = std::numeric_limits<int64_t>::max();
ret_val = IcebergFunctions::TruncateDecimal(nullptr, int64_max_value, 50, 16);
EXPECT_EQ(int64_max_value - (int64_max_value % 50), ret_val.val8);
__int128_t int128_max_value = std::numeric_limits<__int128_t>::max();
ret_val = IcebergFunctions::TruncateDecimal(nullptr, int128_max_value, 50, 16);
EXPECT_EQ(int128_max_value - (int128_max_value % 50), ret_val.val16);
// Test the minimum limit for each representation size.
// Note, decimal type represent int32 min in 8 bytes, int64 min in 16 bytes.
int32_t int32_min_value = std::numeric_limits<int32_t>::min();
int width = 50;
ret_val = IcebergFunctions::TruncateDecimal(nullptr, int32_min_value, width, 8);
int64_t expected64 = (int64_t)int32_min_value - (width + (int32_min_value % width));
EXPECT_EQ(expected64, ret_val.val8);
int64_t int64_min_value = std::numeric_limits<int64_t>::min();
ret_val = IcebergFunctions::TruncateDecimal(nullptr, int64_min_value, width, 16);
__int128_t expected128 =
(__int128_t)int64_min_value - (width + (int64_min_value % width));
EXPECT_EQ(expected128, ret_val.val16);
// Truncation from int128 minimum value causes an overflow.
ctx = CreateFunctionContext();
ret_val = IcebergFunctions::TruncateDecimal(ctx, GetMinForInt128(), width, 16);
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Truncate operation overflows for the given input.",
ctx->error_msg()), 0);
ctx->impl()->Close();
// Test NULL input.
ret_val = IcebergFunctions::TruncatePartitionTransform(
nullptr, DecimalVal::null(), BigIntVal(10));
EXPECT_TRUE(ret_val.is_null);
TestIncorrectWidthParameter<DecimalVal, IntVal>(DecimalVal(0));
TestIncorrectWidthParameter<DecimalVal, BigIntVal>(DecimalVal(0));
}
template<typename T, typename W>
void IcebergTruncatePartitionTransformTests::TestIncorrectWidthParameter(
const T& input) {
// Check for error when width is zero.
FunctionContext* ctx = CreateFunctionContext();
T ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, W(0));
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0);
ctx->impl()->Close();
// Check for error when width is negative.
ctx = CreateFunctionContext();
ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, W(-1));
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0);
ctx->impl()->Close();
// Check for error when width is null.
ctx = CreateFunctionContext();
ret_val = IcebergFunctions::TruncatePartitionTransform(ctx, input, W::null());
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0);
ctx->impl()->Close();
}
void IcebergBucketPartitionTransformTests::TestIntegerNumbers() {
TestIntegerNumbersHelper<IntVal>();
TestIntegerNumbersHelper<BigIntVal>();
// Check for max input value.
IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
IntVal(std::numeric_limits<int32_t>::max()), 100);
EXPECT_EQ(6, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
BigIntVal(std::numeric_limits<int64_t>::max()), 100);
EXPECT_EQ(99, ret_val.val);
// Check for min input value.
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
IntVal(std::numeric_limits<int32_t>::min()), 1000);
EXPECT_EQ(856, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
BigIntVal(std::numeric_limits<int64_t>::min()), 1000);
EXPECT_EQ(829, ret_val.val);
}
template<typename T>
void IcebergBucketPartitionTransformTests::TestIntegerNumbersHelper() {
IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(0), 1000000);
EXPECT_EQ(671676, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(0), 100);
EXPECT_EQ(76, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(12345), 50);
EXPECT_EQ(41, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(12345), 12345);
EXPECT_EQ(12311, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(-987654321), 14500);
EXPECT_EQ(7489, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T(-155), 100);
EXPECT_EQ(99, ret_val.val);
// Check for null input.
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, T::null(), 1);
EXPECT_TRUE(ret_val.is_null);
TestIncorrectWidthParameter<T>(T(0));
}
void IcebergBucketPartitionTransformTests::TestString() {
IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, "Input string",
100);
EXPECT_EQ(52, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, "Input string",
12345);
EXPECT_EQ(5717, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
" ---====1234567890====--- ", 100);
EXPECT_EQ(27, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
" ---====1234567890====--- ", 654321);
EXPECT_EQ(641267, ret_val.val);
// Check for empty string input.
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, "", 10);
EXPECT_EQ(0, ret_val.val);
// Check for null input.
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, StringVal::null(), 1);
EXPECT_TRUE(ret_val.is_null);
TestIncorrectWidthParameter<StringVal>(StringVal("input"));
}
void IcebergBucketPartitionTransformTests::TestDecimal() {
// Testing decimal in unit tests by invoking BucketPartitionTransform seems
// problematic as it queries ARG_TYPE_SIZE from FunctionContext. Apparently, it is not
// properly set in unit tests. Use BucketDecimal instead that skips checking function
// context for ARG_TYPE_SIZE.
// Test int32 based decimal inputs.
IntVal ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345, 100, 4);
EXPECT_EQ(28, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345, 555, 4);
EXPECT_EQ(393, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, 2047, 900, 4);
EXPECT_EQ(439, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, 0, 10, 4);
EXPECT_EQ(7, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, -987654321, 15000, 4);
EXPECT_EQ(8732, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, -987654321, 2000, 4);
EXPECT_EQ(1732, ret_val.val);
// Test int64 based decimal inputs.
ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345678901L, 10000, 8);
EXPECT_EQ(5191, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, 12345678901L, 150, 8);
EXPECT_EQ(41, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, 330011994488229955L, 4800, 8);
EXPECT_EQ(1114, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, 330011994488229955L, 73400, 8);
EXPECT_EQ(12914, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, -360111674415228955L, 5000, 8);
EXPECT_EQ(187, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(nullptr, -360111674415228955L, 9800, 8);
EXPECT_EQ(9187, ret_val.val);
// Test int128 based decimal inputs.
ret_val = IcebergFunctions::BucketDecimal(
nullptr, __int128_t(9223372036854775807) + 1, 5550, 16);
EXPECT_EQ(651, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(
nullptr, __int128_t(9223372036854775807) + 1, 18000, 16);
EXPECT_EQ(12951, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(
nullptr, __int128_t(123321456654789987) * 1000, 18000, 16);
EXPECT_EQ(15169, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(
nullptr, __int128_t(-123321456654789987) * 1000, 11000, 16);
EXPECT_EQ(8353, ret_val.val);
ret_val = IcebergFunctions::BucketDecimal(
nullptr, __int128_t(-123321456654789987) * 1000, 7300, 16);
EXPECT_EQ(853, ret_val.val);
// Test NULL input.
ret_val = IcebergFunctions::BucketPartitionTransform(
nullptr, DecimalVal::null(), IntVal(10));
EXPECT_TRUE(ret_val.is_null);
TestIncorrectWidthParameter<DecimalVal>(DecimalVal(0));
TestIncorrectWidthParameter<DecimalVal>(DecimalVal(0));
}
void IcebergBucketPartitionTransformTests::TestDate() {
IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
DateValue(2017, 11, 16).ToDateVal(), 5000);
EXPECT_EQ(3226, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
DateValue(2017, 11, 16).ToDateVal(), 150);
EXPECT_EQ(76, ret_val.val);
// Test for values pre 1970.
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
DateValue(1901, 12, 5).ToDateVal(), 30);
EXPECT_EQ(13, ret_val.val);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr,
DateValue(50, 3, 11).ToDateVal(), 1900);
EXPECT_EQ(1539, ret_val.val);
// Check for null input.
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, DateVal::null(), 1);
EXPECT_TRUE(ret_val.is_null);
TestIncorrectWidthParameter<DateVal>(DateVal(0));
}
void IcebergBucketPartitionTransformTests::TestTimestamp() {
TimestampVal tv;
TimestampValue::ParseSimpleDateFormat("2017-11-16 22:31:08").ToTimestampVal(&tv);
IntVal ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 100);
EXPECT_EQ(7, ret_val.val);
TimestampValue::ParseSimpleDateFormat("2017-11-16 22:31:08").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 1150);
EXPECT_EQ(957, ret_val.val);
// Check that zero fractional seconts doesn't change the output.
TimestampValue::ParseSimpleDateFormat(
"2017-11-16 22:31:08.000000").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 100);
EXPECT_EQ(7, ret_val.val);
TimestampValue::ParseSimpleDateFormat(
"2017-11-16 22:31:08.000000000").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 100);
EXPECT_EQ(7, ret_val.val);
// Checks for non-zero fractional seconds.
TimestampValue::ParseSimpleDateFormat("2010-10-09 12:39:28.123").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 250);
EXPECT_EQ(107, ret_val.val);
TimestampValue::ParseSimpleDateFormat(
"2010-10-09 12:39:28.123456").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 250);
EXPECT_EQ(115, ret_val.val);
// Check that the 7th, 8th, 9th digit if the fractional second doesn't have effect on
// the result. This is to follow Iceberg's behaviour.
TimestampValue::ParseSimpleDateFormat(
"2010-10-09 12:39:28.123456789").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 250);
EXPECT_EQ(115, ret_val.val);
// Test for values pre 1970.
TimestampValue::ParseSimpleDateFormat(
"1905-01-02 12:20:25").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 2000);
EXPECT_EQ(809, ret_val.val);
TimestampValue::ParseSimpleDateFormat(
"1905-01-02 12:20:25.123456").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 2000);
EXPECT_EQ(932, ret_val.val);
TimestampValue::ParseSimpleDateFormat(
"1905-01-02 12:20:25.123456789").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 2000);
EXPECT_EQ(932, ret_val.val);
TimestampValue::ParseSimpleDateFormat(
"1905-01-02 12:20:25").ToTimestampVal(&tv);
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, tv, 8000);
EXPECT_EQ(2809, ret_val.val);
// Check for null input.
ret_val = IcebergFunctions::BucketPartitionTransform(nullptr, TimestampVal::null(), 1);
EXPECT_TRUE(ret_val.is_null);
TestIncorrectWidthParameter<TimestampVal>(TimestampVal(0));
}
template<typename T>
void IcebergBucketPartitionTransformTests::TestIncorrectWidthParameter(const T& input) {
// Check for error when width is zero.
FunctionContext* ctx = CreateFunctionContext();
IntVal ret_val = IcebergFunctions::BucketPartitionTransform(ctx, input, 0);
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0);
ctx->impl()->Close();
// Check for error when width is negative.
ctx = CreateFunctionContext();
ret_val = IcebergFunctions::BucketPartitionTransform(ctx, input, -1);
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0);
ctx->impl()->Close();
// Check for error when width is null.
ctx = CreateFunctionContext();
ret_val = IcebergFunctions::BucketPartitionTransform(ctx, input, IntVal::null());
EXPECT_TRUE(ret_val.is_null);
EXPECT_EQ(strcmp("Width parameter should be greater than zero.", ctx->error_msg()), 0);
ctx->impl()->Close();
}
TEST(TestIcebergFunctions, TruncateTransform) {
IcebergTruncatePartitionTransformTests::TestIntegerNumbers();
IcebergTruncatePartitionTransformTests::TestString();
IcebergTruncatePartitionTransformTests::TestDecimal();
}
TEST(TestIcebergFunctions, BucketTransform) {
IcebergBucketPartitionTransformTests::TestIntegerNumbers();
IcebergBucketPartitionTransformTests::TestString();
IcebergBucketPartitionTransformTests::TestDecimal();
IcebergBucketPartitionTransformTests::TestDate();
IcebergBucketPartitionTransformTests::TestTimestamp();
}
}

View File

@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "udf/udf.h"
#include <string>
#include <vector>
namespace impala {
using impala_udf::BigIntVal;
using impala_udf::DateVal;
using impala_udf::DecimalVal;
using impala_udf::FunctionContext;
using impala_udf::IntVal;
using impala_udf::StringVal;
using impala_udf::TimestampVal;
/// This class holds functions that are related to Iceberg functionality.
/// E.g. implementations of partition transforms.
class IcebergFunctions {
public:
/// The following functions implement the truncate partition transform that
/// partitioned Iceberg tables use.
static IntVal TruncatePartitionTransform(FunctionContext* ctx,
const IntVal& input, const IntVal& width);
static BigIntVal TruncatePartitionTransform(FunctionContext* ctx,
const BigIntVal& input, const BigIntVal& width);
static DecimalVal TruncatePartitionTransform(FunctionContext* ctx,
const DecimalVal& input, const IntVal& width);
static DecimalVal TruncatePartitionTransform(FunctionContext* ctx,
const DecimalVal& input, const BigIntVal& width);
static StringVal TruncatePartitionTransform(FunctionContext* ctx,
const StringVal& input, const IntVal& width);
/// The following functions implement the bucket partition transform that
/// partitioned Iceberg tables use.
static IntVal BucketPartitionTransform(FunctionContext* ctx,
const IntVal& input, const IntVal& width);
static IntVal BucketPartitionTransform(FunctionContext* ctx,
const BigIntVal& input, const IntVal& width);
static IntVal BucketPartitionTransform(FunctionContext* ctx,
const DecimalVal& input, const IntVal& width);
static IntVal BucketPartitionTransform(FunctionContext* ctx,
const StringVal& input, const IntVal& width);
static IntVal BucketPartitionTransform(FunctionContext* ctx,
const DateVal& input, const IntVal& width);
static IntVal BucketPartitionTransform(FunctionContext* ctx,
const TimestampVal& input, const IntVal& width);
private:
friend class IcebergTruncatePartitionTransformTests;
friend class IcebergBucketPartitionTransformTests;
/// Error message to raise when width parameter is not positive.
static const std::string INCORRECT_WIDTH_ERROR_MSG;
/// Error message to raise when truncate overflows for an int input.
static const std::string TRUNCATE_OVERFLOW_ERROR_MSG;
/// The seed used for bucket transform.
static const unsigned BUCKET_TRANSFORM_SEED;
/// Checks 'input' for null, 'width' for null and negative values. Returns false if any
/// of the checks fail, true otherwise. If any of the width checks fail ths also sets
/// an error message in 'ctx'.
template<typename T, typename W>
static bool CheckInputsAndSetError(FunctionContext* ctx, const T& input,
const W& width);
template<typename T, typename W>
static T TruncatePartitionTransformNumericImpl(FunctionContext* ctx, const T& input,
const W& width);
/// Helper function for TruncatePartitionTransform for Decimals where the size of the
/// decimal representation is given as a parameter.
static DecimalVal TruncateDecimal(FunctionContext* ctx, const DecimalVal& input,
const BigIntVal& width, int decimal_size);
template<typename T>
static DecimalVal TruncatePartitionTransformDecimalImpl(const T& decimal_val,
int64_t width);
template<typename T, typename W>
static IntVal BucketPartitionTransformNumericImpl(FunctionContext* ctx, const T& input,
const W& width);
/// Helper function for BucketPartitionTransform for Decimals where the size of the
/// decimal representation is given as a parameter.
static IntVal BucketDecimal(FunctionContext* ctx, const DecimalVal& input,
const IntVal& width, int decimal_size);
};
}

View File

@@ -28,11 +28,12 @@
#include "exprs/cast-functions.h" #include "exprs/cast-functions.h"
#include "exprs/compound-predicates.h" #include "exprs/compound-predicates.h"
#include "exprs/conditional-functions.h" #include "exprs/conditional-functions.h"
#include "datasketches-functions.h" #include "exprs/datasketches-functions.h"
#include "exprs/date-functions.h" #include "exprs/date-functions.h"
#include "exprs/decimal-functions.h" #include "exprs/decimal-functions.h"
#include "exprs/decimal-operators.h" #include "exprs/decimal-operators.h"
#include "exprs/hive-udf-call.h" #include "exprs/hive-udf-call.h"
#include "exprs/iceberg-functions.h"
#include "exprs/in-predicate.h" #include "exprs/in-predicate.h"
#include "exprs/is-not-empty-predicate.h" #include "exprs/is-not-empty-predicate.h"
#include "exprs/is-null-predicate.h" #include "exprs/is-null-predicate.h"
@@ -435,6 +436,7 @@ void ScalarExprEvaluator::InitBuiltinsDummy() {
DataSketchesFunctions::DsHllEstimate(nullptr, StringVal::null()); DataSketchesFunctions::DsHllEstimate(nullptr, StringVal::null());
DecimalFunctions::Precision(nullptr, DecimalVal::null()); DecimalFunctions::Precision(nullptr, DecimalVal::null());
DecimalOperators::CastToDecimalVal(nullptr, DecimalVal::null()); DecimalOperators::CastToDecimalVal(nullptr, DecimalVal::null());
IcebergFunctions::TruncatePartitionTransform(nullptr, IntVal::null(), IntVal::null());
InPredicate::InIterate(nullptr, BigIntVal::null(), 0, nullptr); InPredicate::InIterate(nullptr, BigIntVal::null(), 0, nullptr);
IsNullPredicate::IsNull(nullptr, BooleanVal::null()); IsNullPredicate::IsNull(nullptr, BooleanVal::null());
LikePredicate::Like(nullptr, StringVal::null(), StringVal::null()); LikePredicate::Like(nullptr, StringVal::null(), StringVal::null());

View File

@@ -0,0 +1,335 @@
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
// Note - The x86 and x64 versions do _not_ produce the same results, as the
// algorithms are optimized for their respective platforms. You can still
// compile and run any of them on any platform, but your performance with the
// non-native version will be less than optimal.
#include "MurmurHash3.h"
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
// Microsoft Visual Studio
#if defined(_MSC_VER)
#define FORCE_INLINE __forceinline
#include <stdlib.h>
#define ROTL32(x,y) _rotl(x,y)
#define ROTL64(x,y) _rotl64(x,y)
#define BIG_CONSTANT(x) (x)
// Other compilers
#else // defined(_MSC_VER)
#define FORCE_INLINE inline __attribute__((always_inline))
inline uint32_t rotl32 ( uint32_t x, int8_t r )
{
return (x << r) | (x >> (32 - r));
}
inline uint64_t rotl64 ( uint64_t x, int8_t r )
{
return (x << r) | (x >> (64 - r));
}
#define ROTL32(x,y) rotl32(x,y)
#define ROTL64(x,y) rotl64(x,y)
#define BIG_CONSTANT(x) (x##LLU)
#endif // !defined(_MSC_VER)
//-----------------------------------------------------------------------------
// Block read - if your platform needs to do endian-swapping or can only
// handle aligned reads, do the conversion here
FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
{
return p[i];
}
FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i )
{
return p[i];
}
//-----------------------------------------------------------------------------
// Finalization mix - force all bits of a hash block to avalanche
FORCE_INLINE uint32_t fmix32 ( uint32_t h )
{
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
//----------
FORCE_INLINE uint64_t fmix64 ( uint64_t k )
{
k ^= k >> 33;
k *= BIG_CONSTANT(0xff51afd7ed558ccd);
k ^= k >> 33;
k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
k ^= k >> 33;
return k;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x86_32 ( const void * key, int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 4;
uint32_t h1 = seed;
const uint32_t c1 = 0xcc9e2d51;
const uint32_t c2 = 0x1b873593;
//----------
// body
const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
for(int i = -nblocks; i; i++)
{
uint32_t k1 = getblock32(blocks,i);
k1 *= c1;
k1 = ROTL32(k1,15);
k1 *= c2;
h1 ^= k1;
h1 = ROTL32(h1,13);
h1 = h1*5+0xe6546b64;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
uint32_t k1 = 0;
switch(len & 3)
{
case 3: k1 ^= tail[2] << 16;
case 2: k1 ^= tail[1] << 8;
case 1: k1 ^= tail[0];
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len;
h1 = fmix32(h1);
*(uint32_t*)out = h1;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x86_128 ( const void * key, const int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;
uint32_t h1 = seed;
uint32_t h2 = seed;
uint32_t h3 = seed;
uint32_t h4 = seed;
const uint32_t c1 = 0x239b961b;
const uint32_t c2 = 0xab0e9789;
const uint32_t c3 = 0x38b34ae5;
const uint32_t c4 = 0xa1e38b93;
//----------
// body
const uint32_t * blocks = (const uint32_t *)(data + nblocks*16);
for(int i = -nblocks; i; i++)
{
uint32_t k1 = getblock32(blocks,i*4+0);
uint32_t k2 = getblock32(blocks,i*4+1);
uint32_t k3 = getblock32(blocks,i*4+2);
uint32_t k4 = getblock32(blocks,i*4+3);
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b;
k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747;
k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35;
k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
uint32_t k1 = 0;
uint32_t k2 = 0;
uint32_t k3 = 0;
uint32_t k4 = 0;
switch(len & 15)
{
case 15: k4 ^= tail[14] << 16;
case 14: k4 ^= tail[13] << 8;
case 13: k4 ^= tail[12] << 0;
k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
case 12: k3 ^= tail[11] << 24;
case 11: k3 ^= tail[10] << 16;
case 10: k3 ^= tail[ 9] << 8;
case 9: k3 ^= tail[ 8] << 0;
k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
case 8: k2 ^= tail[ 7] << 24;
case 7: k2 ^= tail[ 6] << 16;
case 6: k2 ^= tail[ 5] << 8;
case 5: k2 ^= tail[ 4] << 0;
k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
case 4: k1 ^= tail[ 3] << 24;
case 3: k1 ^= tail[ 2] << 16;
case 2: k1 ^= tail[ 1] << 8;
case 1: k1 ^= tail[ 0] << 0;
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len;
h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;
h1 = fmix32(h1);
h2 = fmix32(h2);
h3 = fmix32(h3);
h4 = fmix32(h4);
h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;
((uint32_t*)out)[0] = h1;
((uint32_t*)out)[1] = h2;
((uint32_t*)out)[2] = h3;
((uint32_t*)out)[3] = h4;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x64_128 ( const void * key, const int len,
const uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;
uint64_t h1 = seed;
uint64_t h2 = seed;
const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
//----------
// body
const uint64_t * blocks = (const uint64_t *)(data);
for(int i = 0; i < nblocks; i++)
{
uint64_t k1 = getblock64(blocks,i*2+0);
uint64_t k2 = getblock64(blocks,i*2+1);
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
uint64_t k1 = 0;
uint64_t k2 = 0;
switch(len & 15)
{
case 15: k2 ^= ((uint64_t)tail[14]) << 48;
case 14: k2 ^= ((uint64_t)tail[13]) << 40;
case 13: k2 ^= ((uint64_t)tail[12]) << 32;
case 12: k2 ^= ((uint64_t)tail[11]) << 24;
case 11: k2 ^= ((uint64_t)tail[10]) << 16;
case 10: k2 ^= ((uint64_t)tail[ 9]) << 8;
case 9: k2 ^= ((uint64_t)tail[ 8]) << 0;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
case 8: k1 ^= ((uint64_t)tail[ 7]) << 56;
case 7: k1 ^= ((uint64_t)tail[ 6]) << 48;
case 6: k1 ^= ((uint64_t)tail[ 5]) << 40;
case 5: k1 ^= ((uint64_t)tail[ 4]) << 32;
case 4: k1 ^= ((uint64_t)tail[ 3]) << 24;
case 3: k1 ^= ((uint64_t)tail[ 2]) << 16;
case 2: k1 ^= ((uint64_t)tail[ 1]) << 8;
case 1: k1 ^= ((uint64_t)tail[ 0]) << 0;
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len; h2 ^= len;
h1 += h2;
h2 += h1;
h1 = fmix64(h1);
h2 = fmix64(h2);
h1 += h2;
h2 += h1;
((uint64_t*)out)[0] = h1;
((uint64_t*)out)[1] = h2;
}
//-----------------------------------------------------------------------------

View File

@@ -0,0 +1,40 @@
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
// The name of the include guard had to be changed to avoid collision with Datasketches
// version of murmurhash algorithms in be/src/thirdparty/datasketches/MurmurHash3.h
#ifndef _APPLEBY_MURMURHASH3_H_
#define _APPLEBY_MURMURHASH3_H_
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
// Microsoft Visual Studio
#if defined(_MSC_VER) && (_MSC_VER < 1600)
typedef unsigned char uint8_t;
typedef unsigned int uint32_t;
typedef unsigned __int64 uint64_t;
// Other compilers
#else // defined(_MSC_VER)
#include <stdint.h>
#endif // !defined(_MSC_VER)
//-----------------------------------------------------------------------------
void MurmurHash3_x86_32 ( const void * key, int len, uint32_t seed, void * out );
void MurmurHash3_x86_128 ( const void * key, int len, uint32_t seed, void * out );
void MurmurHash3_x64_128 ( const void * key, int len, uint32_t seed, void * out );
//-----------------------------------------------------------------------------
#endif // _MURMURHASH3_H_

View File

@@ -0,0 +1,9 @@
This library contains implementations for different variations of murmurhash. This is
basically a copy-paste from an external source: https://github.com/aappleby/smhasher.
I used the following commit for our snapshot in Impala:
https://github.com/aappleby/smhasher/tree/92cf3702fcfaadc84eb7bef59825a23e0cd84f56/
Note, the name of the include guard had to be changed to avoid collision with
Datasketches version of murmurhash algorithms in
be/src/thirdparty/datasketches/MurmurHash3.h

View File

@@ -338,5 +338,75 @@ TEST(BitUtil, CpuInfoIsSupportedHoist) {
} }
#endif #endif
template<typename T>
void RunIntToByteArrayTest(const T& input, const std::vector<char>& expected) {
std::string result = BitUtil::IntToByteBuffer(input);
EXPECT_EQ(expected.size(), result.size()) << input;
for (int i = 0; i < std::min(expected.size(), result.size()); ++i) {
EXPECT_EQ(expected[i], result[i]) << input;
}
}
// __int128_t has no support for << operator. This is a specialized function for this
// type where in case of an EXPECT check failing we don't print the original input.
template<>
void RunIntToByteArrayTest(const __int128_t& input, const std::vector<char>& expected) {
std::string result = BitUtil::IntToByteBuffer(input);
EXPECT_EQ(expected.size(), result.size());
for (int i = 0; i < std::min(expected.size(), result.size()); ++i) {
EXPECT_EQ(expected[i], result[i]);
}
}
// The expected results come from running Java's BigInteger.toByteArray().
TEST(BitUtil, IntToByteArray) {
// Test int32 inputs.
RunIntToByteArrayTest(0, {0});
RunIntToByteArrayTest(127, {127});
RunIntToByteArrayTest(128, {0, -128});
RunIntToByteArrayTest(255, {0, -1});
RunIntToByteArrayTest(256, {1, 0});
RunIntToByteArrayTest(65500, {0, -1, -36});
RunIntToByteArrayTest(123456123, {7, 91, -54, 123});
RunIntToByteArrayTest(2147483647, {127, -1, -1, -1});
RunIntToByteArrayTest(-1, {-1});
RunIntToByteArrayTest(-128, {-128});
RunIntToByteArrayTest(-129, {-1, 127});
RunIntToByteArrayTest(-1024, {-4, 0});
RunIntToByteArrayTest(-1025, {-5, -1});
RunIntToByteArrayTest(-40000, {-1, 99, -64});
RunIntToByteArrayTest(-68000, {-2, -10, 96});
RunIntToByteArrayTest(-654321321, {-40, -1, -39, 87});
RunIntToByteArrayTest(-2147483647, {-128, 0, 0, 1});
RunIntToByteArrayTest(-2147483648, {-128, 0, 0, 0});
// Test int64 inputs.
RunIntToByteArrayTest(2147483648, {0, -128, 0, 0, 0});
RunIntToByteArrayTest(2147489999, {0, -128, 0, 24, -49});
RunIntToByteArrayTest(123498764226421, {112, 82, 75, -8, -49, 117});
RunIntToByteArrayTest(45935528764226421, {0, -93, 50, 30, -70, -113, -69, 117});
RunIntToByteArrayTest(9223372036854775807, {127, -1, -1, -1, -1, -1, -1, -1});
RunIntToByteArrayTest(-2147483649, {-1, 127, -1, -1, -1});
RunIntToByteArrayTest(-2147483650, {-1, 127, -1, -1, -2});
RunIntToByteArrayTest(-226103951038195, {-1, 50, 92, 18, 80, -23, 13});
RunIntToByteArrayTest(-18237603371852591, {-65, 52, -1, 17, 119, 92, -47});
RunIntToByteArrayTest(-48227503771052199, {-1, 84, -87, 87, 65, 82, -105, 89});
RunIntToByteArrayTest(-9223372036854775807, {-128, 0, 0, 0, 0, 0, 0, 1});
// C++ compiler doesn't accept -9223372036854775808 (int64_t min) as a valid constant.
RunIntToByteArrayTest(-9223372036854775807 - 1, {-128, 0, 0, 0, 0, 0, 0, 0});
// Test int128 inputs.
RunIntToByteArrayTest(__int128_t(9223372036854775807) + 1,
{0, -128, 0, 0, 0, 0, 0, 0, 0});
RunIntToByteArrayTest(__int128_t(9223372036854775807) + 2,
{0, -128, 0, 0, 0, 0, 0, 0, 1});
RunIntToByteArrayTest(__int128_t(123321456654789987) * 1000,
{6, -81, 109, -45, 113, 68, 125, 74, -72});
RunIntToByteArrayTest(__int128_t(7255211462147863907) * 120000,
{0, -72, 92, -78, 56, 127, -30, 94, 113, 6, 64});
RunIntToByteArrayTest(__int128_t(7255211462147863907) * 180000,
{1, 20, -117, 11, 84, -65, -45, -115, -87, -119, 96});
}
} }

View File

@@ -23,9 +23,11 @@
#include <endian.h> #include <endian.h>
#endif #endif
#include <algorithm>
#include <climits> #include <climits>
#include <cstdint> #include <cstdint>
#include <limits> #include <limits>
#include <vector>
#include "common/compiler-util.h" #include "common/compiler-util.h"
#include "common/logging.h" #include "common/logging.h"
@@ -341,6 +343,33 @@ class BitUtil {
return floor + 1; return floor + 1;
} }
} }
/// The purpose of this function is to replicate Java's BigInteger.toByteArray()
/// function. Receives an int input (it can be any size) and returns a buffer that
/// represents the byte sequence representation of this number where the most
/// significant byte is in the zeroth element.
/// Note, the return value is stored in a string instead of in a vector of chars for
/// potential optimisations with small strings.
/// E.g. 520 = [2, 8]
/// E.g. 12065530 = [0, -72, 26, -6]
/// E.g. -129 = [-1, 127]
template<typename T>
static std::string IntToByteBuffer(T input) {
std::string buffer;
T value = input;
for (int i = 0; i < sizeof(value); ++i) {
// Applies a mask for a byte range on the input.
char value_to_save = value & 0XFF;
buffer.push_back(value_to_save);
// Remove the just processed part from the input so that we can exit early if there
// is nothing left to process.
value >>= 8;
if (value == 0 && value_to_save >= 0) break;
if (value == -1 && value_to_save < 0) break;
}
std::reverse(buffer.begin(), buffer.end());
return buffer;
}
}; };
/// An encapsulation class of SIMD byteswap functions /// An encapsulation class of SIMD byteswap functions

View File

@@ -30,6 +30,7 @@ testdata/cluster/node_templates/cdh7/etc/init.d/kms
# See $IMPALA_HOME/LICENSE.txt # See $IMPALA_HOME/LICENSE.txt
be/src/gutil/* be/src/gutil/*
be/src/thirdparty/datasketches/* be/src/thirdparty/datasketches/*
be/src/thirdparty/murmurhash/*
be/src/thirdparty/mpfit/* be/src/thirdparty/mpfit/*
be/src/kudu/gutil be/src/kudu/gutil
www/highlight/* www/highlight/*

View File

@@ -42,7 +42,7 @@ DIRS=$(ls -d "${IMPALA_HOME}/be/src/"*/ | grep -v gutil | grep -v kudu |\
grep -v thirdparty | tr '\n' ' ') grep -v thirdparty | tr '\n' ' ')
# Include/exclude select thirdparty dirs. # Include/exclude select thirdparty dirs.
DIRS=$DIRS$(ls -d "${IMPALA_HOME}/be/src/thirdparty/"*/ | grep -v mpfit |\ DIRS=$DIRS$(ls -d "${IMPALA_HOME}/be/src/thirdparty/"*/ | grep -v mpfit |\
grep -v datasketches | tr '\n' ' ') grep -v datasketches | grep -v murmurhash | tr '\n' ' ')
PIPE_DIRS=$(echo "${DIRS}" | tr ' ' '|') PIPE_DIRS=$(echo "${DIRS}" | tr ' ' '|')
# Reduce the concurrency to one less than the number of cores in the system. Note than # Reduce the concurrency to one less than the number of cores in the system. Note than

View File

@@ -1001,4 +1001,29 @@ invisible_functions = [
[['notdistinct'], 'BOOLEAN', ['CHAR', 'CHAR'], 'impala::Operators::NotDistinct_Char_Char'], [['notdistinct'], 'BOOLEAN', ['CHAR', 'CHAR'], 'impala::Operators::NotDistinct_Char_Char'],
[['notdistinct'], 'BOOLEAN', ['DECIMAL', 'DECIMAL'], 'impala::DecimalOperators::NotDistinct_DecimalVal_DecimalVal'], [['notdistinct'], 'BOOLEAN', ['DECIMAL', 'DECIMAL'], 'impala::DecimalOperators::NotDistinct_DecimalVal_DecimalVal'],
[['notdistinct'], 'BOOLEAN', ['DATE', 'DATE'], 'impala::Operators::NotDistinct_DateVal_DateVal'], [['notdistinct'], 'BOOLEAN', ['DATE', 'DATE'], 'impala::Operators::NotDistinct_DateVal_DateVal'],
# Functions related to Apache Iceberg functionality.
[['iceberg_truncate_transform'], 'INT', ['INT', 'INT'],
'_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_6IntValES6_'],
[['iceberg_truncate_transform'], 'BIGINT', ['BIGINT', 'BIGINT'],
'_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_9BigIntValES6_'],
[['iceberg_truncate_transform'], 'DECIMAL', ['DECIMAL', 'INT'],
'_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_10DecimalValERKNS1_6IntValE'],
[['iceberg_truncate_transform'], 'DECIMAL', ['DECIMAL', 'BIGINT'],
'_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_10DecimalValERKNS1_9BigIntValE'],
[['iceberg_truncate_transform'], 'STRING', ['STRING', 'INT'],
'_ZN6impala16IcebergFunctions26TruncatePartitionTransformEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_6IntValE'],
[['iceberg_bucket_transform'], 'INT', ['INT', 'INT'],
'_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_6IntValES6_'],
[['iceberg_bucket_transform'], 'INT', ['BIGINT', 'INT'],
'_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_9BigIntValERKNS1_6IntValE'],
[['iceberg_bucket_transform'], 'INT', ['DECIMAL', 'INT'],
'_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_10DecimalValERKNS1_6IntValE'],
[['iceberg_bucket_transform'], 'INT', ['STRING', 'INT'],
'_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_6IntValE'],
[['iceberg_bucket_transform'], 'INT', ['DATE', 'INT'],
'_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_7DateValERKNS1_6IntValE'],
[['iceberg_bucket_transform'], 'INT', ['TIMESTAMP', 'INT'],
'_ZN6impala16IcebergFunctions24BucketPartitionTransformEPN10impala_udf15FunctionContextERKNS1_12TimestampValERKNS1_6IntValE'],
] ]