From 1eb2b7a96466cc77aa2f5745b2cef4c814b7b557 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Mon, 30 Sep 2013 23:26:34 -0700 Subject: [PATCH] Add execution for vararg UDFs. Change-Id: I46e5670c09ac0b8e62f39dfc832fe880dd1dc995 Reviewed-on: http://gerrit.ent.cloudera.com:8080/572 Tested-by: jenkins Reviewed-by: Nong Li --- be/src/exprs/native-udf-expr.cc | 99 ++++++++++++++----- be/src/exprs/native-udf-expr.h | 14 ++- be/src/exprs/udf-util.cc | 23 ++++- be/src/exprs/udf-util.h | 4 + be/src/testutil/test-udfs.cc | 27 +++++ common/thrift/Exprs.thrift | 5 +- .../analysis/CreateFunctionStmtBase.java | 2 +- .../impala/analysis/FunctionCallExpr.java | 8 +- .../com/cloudera/impala/catalog/Function.java | 2 +- .../queries/QueryTest/load-ir-udfs.test | 11 ++- .../queries/QueryTest/load-native-udfs.test | 11 ++- .../queries/QueryTest/udf.test | 65 ++++++++++++ 12 files changed, 237 insertions(+), 34 deletions(-) diff --git a/be/src/exprs/native-udf-expr.cc b/be/src/exprs/native-udf-expr.cc index e7ed3b605..10a4f1bee 100644 --- a/be/src/exprs/native-udf-expr.cc +++ b/be/src/exprs/native-udf-expr.cc @@ -36,7 +36,8 @@ NativeUdfExpr::NativeUdfExpr(const TExprNode& node) udf_type_(node.udf_call_expr.binary_type), hdfs_location_(node.udf_call_expr.binary_location), symbol_name_(node.udf_call_expr.symbol_name), - has_var_args_(node.udf_call_expr.has_var_args), + vararg_start_idx_(node.udf_call_expr.__isset.vararg_start_idx ? + node.udf_call_expr.vararg_start_idx : -1), udf_wrapper_(NULL), codegen_(NULL), ir_udf_wrapper_(NULL) { @@ -133,10 +134,18 @@ void* NativeUdfExpr::ComputeFn(Expr* e, TupleRow* row) { } Status NativeUdfExpr::Prepare(RuntimeState* state, const RowDescriptor& desc) { - if (has_var_args_) return Status("Vararg UDFs not yet implemented."); if (state->llvm_codegen() == NULL) { return Status("UDFs cannot be evaluated with codegen disabled"); } + if (vararg_start_idx_ != -1) { + DCHECK_GT(GetNumChildren(), vararg_start_idx_); + // Allocate a scratch buffer for all the variable args. + varargs_input_.resize(GetNumChildren() - vararg_start_idx_); + for (int i = 0; i < varargs_input_.size(); ++i) { + varargs_input_[i] = CreateAnyVal(state->obj_pool(), + children_[vararg_start_idx_ + i]->type()); + } + } codegen_ = state->llvm_codegen(); RETURN_IF_ERROR(Expr::PrepareChildren(state, desc)); RETURN_IF_ERROR(GetIrComputeFn(state, &ir_udf_wrapper_)); @@ -147,19 +156,30 @@ Status NativeUdfExpr::Prepare(RuntimeState* state, const RowDescriptor& desc) { // Dynamically loads the pre-compiled UDF and codegens a function that calls each child's // codegen'd function, then passes those values to the UDF and returns the result. // Example generated IR for a UDF with signature -// SmallIntVal Identity(FunctionContext*, SmallIntVal*): -// -// define i32 @UdfWrapper(i8* %context, %"class.impala::TupleRow"* %row) { +// create function Udf(double, int...) returns double +// select Udf(1.0, 2, 3, 4, 5) +// define { i8, double } @UdfWrapper(i8* %context, %"class.impala::TupleRow"* %row) { // entry: -// %arg_val = call i32 @ExprWrapper(i8* %context, %"class.impala::TupleRow"* %row) -// %arg_ptr = alloca i32 -// store i32 %arg_val, i32* %arg_ptr -// %result = call i32 @_Z8IdentityPN10impala_udf15FunctionContextERKNS_11SmallIntValE( -// %"class.impala_udf::FunctionContext"* inttoptr -// (i64 51760208 to %"class.impala_udf::FunctionContext"*), -// i32* %arg_ptr) -// ret i32 %result -// } +// %arg_val = call { i8, double } +// @ExprWrapper(i8* %context, %"class.impala::TupleRow"* %row) +// %arg_ptr = alloca { i8, double } +// store { i8, double } %arg_val, { i8, double }* %arg_ptr +// %arg_val1 = call i64 @ExprWrapper1(i8* %context, %"class.impala::TupleRow"* %row) +// store i64 %arg_val1, i64* inttoptr (i64 89111072 to i64*) +// %arg_val2 = call i64 @ExprWrapper2(i8* %context, %"class.impala::TupleRow"* %row) +// store i64 %arg_val2, i64* inttoptr (i64 89111080 to i64*) +// %arg_val3 = call i64 @ExprWrapper3(i8* %context, %"class.impala::TupleRow"* %row) +// store i64 %arg_val3, i64* inttoptr (i64 89111088 to i64*) +// %arg_val4 = call i64 @ExprWrapper4(i8* %context, %"class.impala::TupleRow"* %row) +// store i64 %arg_val4, i64* inttoptr (i64 89111096 to i64*) +// %result = call { i8, double } +// @_Z14VarSumMultiplyPN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE( +// %"class.impala_udf::FunctionContext"* inttoptr +// (i64 37522464 to %"class.impala_udf::FunctionContext"*), +// {i8, double }* %arg_ptr, +// i32 4, +// i64* inttoptr (i64 89111072 to i64*)) +// ret { i8, double } %result Status NativeUdfExpr::GetIrComputeFn(RuntimeState* state, llvm::Function** fn) { LlvmCodeGen* codegen = state->llvm_codegen(); llvm::Function* udf; @@ -179,20 +199,43 @@ Status NativeUdfExpr::GetIrComputeFn(RuntimeState* state, llvm::Function** fn) { // Call children to populate remaining arguments llvm::Function::arg_iterator arg = udf->arg_begin(); ++arg; // Skip FunctionContext* arg - for (int i = 0; i < children_.size(); ++i, ++arg) { + for (int i = 0; i < GetNumChildren(); ++i) { + if (vararg_start_idx_ == i) { + // This is the start of the varargs, first add the number of args. + udf_args.push_back(codegen->GetIntConstant( + TYPE_INT, GetNumChildren() - vararg_start_idx_)); + ++arg; + } + llvm::Function* child_fn; RETURN_IF_ERROR(children_[i]->GetIrComputeFn(state, &child_fn)); DCHECK(child_fn != NULL); - llvm::Value* arg_val = builder.CreateCall(child_fn, args, "arg_val"); - llvm::Value* arg_ptr = builder.CreateAlloca(child_fn->getReturnType(), 0, "arg_ptr"); - builder.CreateStore(arg_val, arg_ptr); - // The *Val type returned by child_fn will be likely be lowered to a simpler type, so - // we must cast arg_ptr to the actual *Val struct pointer type expected by the UDF. - llvm::Value* cast_arg_ptr = - builder.CreateBitCast(arg_ptr, arg->getType(), "cast_arg_ptr"); - udf_args.push_back(cast_arg_ptr); + if (vararg_start_idx_ == -1 || i < vararg_start_idx_) { + // Either no varargs or arguments before varargs begin. + llvm::Value* arg_ptr = builder.CreateAlloca(child_fn->getReturnType(), 0, "arg_ptr"); + builder.CreateStore(arg_val, arg_ptr); + + // The *Val type returned by child_fn will be likely be lowered to a simpler type, so + // we must cast arg_ptr to the actual *Val struct pointer type expected by the UDF. + llvm::Value* cast_arg_ptr = + builder.CreateBitCast(arg_ptr, arg->getType(), "cast_arg_ptr"); + udf_args.push_back(cast_arg_ptr); + ++arg; + } else { + // Store the result of child(i) in varargs_input_[i - vararg_start_idx_] + int varargs_input_idx = i - vararg_start_idx_; + llvm::Type* arg_ptr_type = llvm::PointerType::get(arg_val->getType(), 0); + llvm::Value* arg_ptr = codegen->CastPtrToLlvmPtr( + arg_ptr_type, &varargs_input_[varargs_input_idx]); + builder.CreateStore(arg_val, arg_ptr); + } + } + + if (vararg_start_idx_ != -1) { + // Add all the accumulated vararg inputs as one input argument. + udf_args.push_back(codegen->CastPtrToLlvmPtr(arg->getType(), &varargs_input_[0])); } // Call UDF @@ -224,11 +267,19 @@ Status NativeUdfExpr::GetUdf(RuntimeState* state, llvm::Function** udf) { llvm::Type* return_type = CodegenAnyVal::GetType(codegen, type()); vector arg_types; arg_types.push_back(codegen->GetPtrType("class.impala_udf::FunctionContext")); - for (int i = 0; i < children_.size(); ++i) { + int num_fixed_args = vararg_start_idx_ >= 0 ? vararg_start_idx_ : children_.size(); + for (int i = 0; i < num_fixed_args; ++i) { llvm::Type* child_return_type = CodegenAnyVal::GetType(codegen, children_[i]->type()); arg_types.push_back(llvm::PointerType::get(child_return_type, 0)); } + + if (vararg_start_idx_ >= 0) { + llvm::Type* vararg_return_type = + CodegenAnyVal::GetType(codegen, children_[vararg_start_idx_]->type()); + arg_types.push_back(codegen->GetType(TYPE_INT)); + arg_types.push_back(llvm::PointerType::get(vararg_return_type, 0)); + } llvm::FunctionType* udf_type = llvm::FunctionType::get(return_type, arg_types, false); // Create a llvm::Function* with the generated type. This is only a function diff --git a/be/src/exprs/native-udf-expr.h b/be/src/exprs/native-udf-expr.h index e033d5486..b6cf99db4 100644 --- a/be/src/exprs/native-udf-expr.h +++ b/be/src/exprs/native-udf-expr.h @@ -20,6 +20,10 @@ #include "exprs/expr.h" #include "udf/udf.h" +namespace impala_udf { + class AnyVal; +}; + namespace impala { class TExprNode; @@ -70,8 +74,10 @@ class NativeUdfExpr: public Expr { std::string hdfs_location_; std::string symbol_name_; - // If true, this function has var args. - bool has_var_args_; + // If this function has var args, children()[vararg_start_idx_] is the + // first vararg argument. + // If this function does not have varargs, it is set to -1. + int vararg_start_idx_; // Function pointer to the JIT'd function produced by GetIrComputeFn(). Initialized and // called by ComputeFn(). @@ -82,6 +88,10 @@ class NativeUdfExpr: public Expr { LlvmCodeGen* codegen_; llvm::Function* ir_udf_wrapper_; + // Vector of preallocate input objects to pass to UDF if it has varargs. + // TODO: Move to to ExprContext + std::vector varargs_input_; + // Loads the native or IR function from HDFS and puts the result in *udf. Status GetUdf(RuntimeState* state, llvm::Function** udf); }; diff --git a/be/src/exprs/udf-util.cc b/be/src/exprs/udf-util.cc index 0cf11f25c..a97fc0c86 100644 --- a/be/src/exprs/udf-util.cc +++ b/be/src/exprs/udf-util.cc @@ -15,7 +15,9 @@ #include "exprs/udf-util.h" using namespace llvm; -using namespace impala; +using namespace impala_udf; + +namespace impala { CodegenAnyVal::CodegenAnyVal(LlvmCodeGen* codegen, LlvmCodeGen::LlvmBuilder* builder, PrimitiveType type, Value* value, const char* name) @@ -176,3 +178,22 @@ CodegenAnyVal CodegenAnyVal::GetNonNullVal(LlvmCodeGen* codegen, Value* value = Constant::getNullValue(val_type); return CodegenAnyVal(codegen, builder, type, value, name); } + +AnyVal* CreateAnyVal(ObjectPool* pool, PrimitiveType type) { + switch(type) { + case TYPE_BOOLEAN: return pool->Add(new BooleanVal); + case TYPE_TINYINT: return pool->Add(new TinyIntVal); + case TYPE_SMALLINT: return pool->Add(new SmallIntVal); + case TYPE_INT: return pool->Add(new IntVal); + case TYPE_BIGINT: return pool->Add(new BigIntVal); + case TYPE_FLOAT: return pool->Add(new FloatVal); + case TYPE_DOUBLE: return pool->Add(new DoubleVal); + case TYPE_STRING: return pool->Add(new StringVal); + case TYPE_TIMESTAMP: return pool->Add(new TimestampVal); + default: + DCHECK(false) << "Unsupported type: " << type; + return NULL; + } +} + +} diff --git a/be/src/exprs/udf-util.h b/be/src/exprs/udf-util.h index 452d3b116..192cb2cd4 100644 --- a/be/src/exprs/udf-util.h +++ b/be/src/exprs/udf-util.h @@ -17,6 +17,7 @@ #include "codegen/llvm-codegen.h" #include "runtime/primitive-type.h" +#include "udf/udf.h" namespace llvm { class Type; @@ -100,6 +101,9 @@ class CodegenAnyVal { const char* name = ""); }; +// Creates the corresponding AnyVal subclass for type. The object is added to the pool. +impala_udf::AnyVal* CreateAnyVal(ObjectPool* pool, PrimitiveType type); + } #endif diff --git a/be/src/testutil/test-udfs.cc b/be/src/testutil/test-udfs.cc index 508e52272..4acc71697 100644 --- a/be/src/testutil/test-udfs.cc +++ b/be/src/testutil/test-udfs.cc @@ -48,3 +48,30 @@ IntVal AllTypes( StringVal NoArgs(FunctionContext* context) { return StringVal(reinterpret_cast(const_cast("string")), 6); } + +IntVal VarSum(FunctionContext* context, int n, const IntVal* args) { + int result = 0; + bool is_null = true; + for (int i = 0; i < n; ++i) { + if (args[i].is_null) continue; + result += args[i].val; + is_null = false; + } + if (is_null) return IntVal::null(); + return IntVal(result); +} + +DoubleVal VarSumMultiply(FunctionContext* context, + const DoubleVal& d, int n, const IntVal* args) { + if (d.is_null) return DoubleVal::null(); + + int result = 0; + bool is_null = true; + for (int i = 0; i < n; ++i) { + if (args[i].is_null) continue; + result += args[i].val; + is_null = false; + } + if (is_null) return DoubleVal::null(); + return DoubleVal(result * d.val); +} diff --git a/common/thrift/Exprs.thrift b/common/thrift/Exprs.thrift index 4a34254f4..3b1d90579 100644 --- a/common/thrift/Exprs.thrift +++ b/common/thrift/Exprs.thrift @@ -119,7 +119,10 @@ struct TUdfCallExpr { 3: required Types.TFunctionBinaryType binary_type - 4: required bool has_var_args; + // If set, this udf has varargs and this is the index for the first variable + // argument. + // e.g. if the signature was fn(int, double, string...), the index would be 2. + 4: optional i32 vararg_start_idx } // This is essentially a union over the subclasses of Expr. diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java index 81f111d71..aa38f4776 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/CreateFunctionStmtBase.java @@ -78,7 +78,7 @@ public class CreateFunctionStmtBase extends StatementBase { params.setArg_types(types); params.setRet_type(fn_.getReturnType().toThrift()); - params.setHas_var_args(fn_.getHasVarArgs()); + params.setHas_var_args(fn_.hasVarArgs()); params.setComment(getComment()); params.setIf_not_exists(getIfNotExists()); return params; diff --git a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java index fdf3b4f2b..a78f72874 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/FunctionCallExpr.java @@ -66,7 +66,9 @@ public class FunctionCallExpr extends Expr { msg.udf_call_expr.setBinary_location(udf.getLocation().toString()); msg.udf_call_expr.setSymbol_name(udf.getSymbolName()); msg.udf_call_expr.setBinary_type(udf.getBinaryType()); - msg.udf_call_expr.setHas_var_args(udf.getHasVarArgs()); + if (udf.hasVarArgs()) { + msg.udf_call_expr.setVararg_start_idx(udf.getNumArgs() - 1); + } } else { Preconditions.checkState(fn_ instanceof OpcodeRegistry.BuiltinFunction); OpcodeRegistry.BuiltinFunction builtin = (OpcodeRegistry.BuiltinFunction)fn_; @@ -75,8 +77,10 @@ public class FunctionCallExpr extends Expr { msg.setUdf_call_expr(new TUdfCallExpr()); msg.udf_call_expr.setBinary_location(""); msg.udf_call_expr.setSymbol_name(functionName_.getFunction()); - msg.udf_call_expr.setHas_var_args(builtin.getHasVarArgs()); msg.udf_call_expr.setBinary_type(TFunctionBinaryType.BUILTIN); + if (fn_.hasVarArgs()) { + msg.udf_call_expr.setVararg_start_idx(fn_.getNumArgs() - 1); + } } else { // TODO: remove. All builtins will go through UDF_CALL. msg.node_type = TExprNodeType.FUNCTION_CALL; diff --git a/fe/src/main/java/com/cloudera/impala/catalog/Function.java b/fe/src/main/java/com/cloudera/impala/catalog/Function.java index 5f8e206de..3148310d1 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/Function.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/Function.java @@ -95,7 +95,7 @@ public class Function { public int getNumArgs() { return argTypes_.length; } public HdfsURI getLocation() { return location_; } public TFunctionBinaryType getBinaryType() { return binaryType_; } - public boolean getHasVarArgs() { return hasVarArgs_; } + public boolean hasVarArgs() { return hasVarArgs_; } public PrimitiveType getVarArgsType() { if (!hasVarArgs_) return PrimitiveType.INVALID_TYPE; Preconditions.checkState(argTypes_.length > 0); diff --git a/testdata/workloads/functional-query/queries/QueryTest/load-ir-udfs.test b/testdata/workloads/functional-query/queries/QueryTest/load-ir-udfs.test index 131fb492f..9777a5303 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/load-ir-udfs.test +++ b/testdata/workloads/functional-query/queries/QueryTest/load-ir-udfs.test @@ -11,8 +11,9 @@ drop function if exists udf_test.identity(string); drop function if exists udf_test.all_types_fn( string, boolean, tinyint, smallint, int, bigint, float, double); drop function if exists udf_test.no_args(); +drop function if exists udf_test.var_sum(int...); +drop function if exists udf_test.var_sum_multiply(double, int...); -drop database if exists udf_test; create database if not exists udf_test; create function udf_test.identity(boolean) returns boolean @@ -56,4 +57,12 @@ symbol='_Z8AllTypesPN10impala_udf15FunctionContextERKNS_9StringValERKNS_10Boolea create function udf_test.no_args() returns string location '/test-warehouse/test-udfs.ll' symbol='_Z6NoArgsPN10impala_udf15FunctionContextE'; + +create function udf_test.var_sum(int...) returns int +location '/test-warehouse/test-udfs.ll' +symbol='_Z6VarSumPN10impala_udf15FunctionContextEiPKNS_6IntValE'; + +create function udf_test.var_sum_multiply(double, int...) returns double +location '/test-warehouse/test-udfs.ll' +symbol='_Z14VarSumMultiplyPN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE'; ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/load-native-udfs.test b/testdata/workloads/functional-query/queries/QueryTest/load-native-udfs.test index 6767687ec..2f545b1a2 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/load-native-udfs.test +++ b/testdata/workloads/functional-query/queries/QueryTest/load-native-udfs.test @@ -11,8 +11,9 @@ drop function if exists udf_test.identity(string); drop function if exists udf_test.all_types_fn( string, boolean, tinyint, smallint, int, bigint, float, double); drop function if exists udf_test.no_args(); +drop function if exists udf_test.var_sum(int...); +drop function if exists udf_test.var_sum_multiply(double, int...); -drop database if exists udf_test; create database if not exists udf_test; create function udf_test.identity(boolean) returns boolean @@ -56,4 +57,12 @@ symbol='_Z8AllTypesPN10impala_udf15FunctionContextERKNS_9StringValERKNS_10Boolea create function udf_test.no_args() returns string location '/test-warehouse/libTestUdfs.so' symbol='_Z6NoArgsPN10impala_udf15FunctionContextE'; + +create function udf_test.var_sum(int...) returns int +location '/test-warehouse/libTestUdfs.so' +symbol='_Z6VarSumPN10impala_udf15FunctionContextEiPKNS_6IntValE'; + +create function udf_test.var_sum_multiply(double, int...) returns double +location '/test-warehouse/libTestUdfs.so' +symbol='_Z14VarSumMultiplyPN10impala_udf15FunctionContextERKNS_9DoubleValEiPKNS_6IntValE'; ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/udf.test b/testdata/workloads/functional-query/queries/QueryTest/udf.test index 3088d866e..aed151729 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/udf.test +++ b/testdata/workloads/functional-query/queries/QueryTest/udf.test @@ -184,3 +184,68 @@ int ---- RESULT NULL ==== +---- QUERY +select udf_test.var_sum(NULL, NULL, NULL) +---- TYPES +int +---- RESULT +NULL +==== +---- QUERY +select udf_test.var_sum(1, 2, 3, 4, 5, 6) +---- TYPES +int +---- RESULT +21 +==== +---- QUERY +select tinyint_col, int_col, udf_test.var_sum(tinyint_col, int_col) +from functional.alltypestiny +---- TYPES +tinyint, int, int +---- RESULT +0,0,0 +1,1,2 +0,0,0 +1,1,2 +0,0,0 +1,1,2 +0,0,0 +1,1,2 +==== +---- QUERY +select udf_test.var_sum_multiply(NULL, 1, 2) +---- TYPES +double +---- RESULT +NULL +==== +---- QUERY +select udf_test.var_sum_multiply(1.0, 1, 2, NULL, 3) +---- TYPES +double +---- RESULT +NULL +==== +---- QUERY +select udf_test.var_sum_multiply(5.0, 1, 2, 3, 4, 5, 6) +---- TYPES +double +---- RESULT +105 +==== +---- QUERY +select tinyint_col, int_col, udf_test.var_sum_multiply(2, tinyint_col, int_col) +from functional.alltypestiny +---- TYPES +tinyint, int, double +---- RESULT +0,0,0 +1,1,4 +0,0,0 +1,1,4 +0,0,0 +1,1,4 +0,0,0 +1,1,4 +====