Update opcode registry to support UDF-interface builtins.

There's a bigger change to migrate the rest of them but I think this is how
the builtins, when not running as cross compiled, should be run. This mode
is still useful when developing the builtin.

When run as cross compiled IR, we wouldn't do anything to distinguish between
a builtin and an external UDF.

Change-Id: I6aa336b22aa19b00507bad33c9df3978baa576cc
Reviewed-on: http://gerrit.ent.cloudera.com:8080/542
Tested-by: jenkins
Reviewed-by: Skye Wanderman-Milne <skye@cloudera.com>
This commit is contained in:
Nong Li
2013-09-25 20:31:42 -07:00
committed by Henry Robinson
parent 4bb1e8c854
commit e959e49b7c
14 changed files with 203 additions and 53 deletions

View File

@@ -48,6 +48,7 @@ add_library(Exprs
timezone_db.cc
tuple-is-null-predicate.cc
native-udf-expr.cc
udf-builtins.cc
udf-util.cc
utility-functions.cc
)

View File

@@ -225,20 +225,18 @@ class ExprTest : public testing::Test {
case TYPE_BIGINT:
EXPECT_EQ(*reinterpret_cast<int64_t*>(result), expected_result) << expr;
break;
case TYPE_FLOAT: {
case TYPE_FLOAT:
// Converting the float back from a string is inaccurate so convert
// the expected result to a string.
RawValue::PrintValue(reinterpret_cast<const void*>(&expected_result),
TYPE_FLOAT, -1, &expected_str);
EXPECT_EQ(*reinterpret_cast<string*>(result), expected_str) << expr;
break;
}
case TYPE_DOUBLE: {
case TYPE_DOUBLE:
RawValue::PrintValue(reinterpret_cast<const void*>(&expected_result),
TYPE_DOUBLE, -1, &expected_str);
EXPECT_EQ(*reinterpret_cast<string*>(result), expected_str) << expr;
break;
}
default:
ASSERT_TRUE(false) << "invalid TestValue() type: " << TypeToString(expr_type);
}
@@ -2658,11 +2656,19 @@ TEST_F(ExprTest, ResultsLayoutTest) {
}
}
TEST_F(ExprTest, UdfBuiltins) {
// These currently don't run with codegen disabled
if (disable_codegen_) return;
TestValue("udf_pi()", TYPE_DOUBLE, M_PI);
TestValue("udf_abs(-1)", TYPE_DOUBLE, 1.0);
TestStringValue("udf_lower('Hello_WORLD')", "hello_world");
}
}
int main(int argc, char **argv) {
InitCommonRuntime(argc, argv, true);
::testing::InitGoogleTest(&argc, argv);
InitCommonRuntime(argc, argv, true);
InitFeSupport();
impala::LlvmCodeGen::InitializeLlvm();

View File

@@ -308,6 +308,7 @@ Status Expr::CreateExpr(ObjectPool* pool, const TExprNode& texpr_node, Expr** ex
*expr = pool->Add(new FloatLiteral(texpr_node));
return Status::OK;
case TExprNodeType::FUNCTION_CALL:
DCHECK(texpr_node.__isset.opcode);
*expr = pool->Add(new FunctionCall(texpr_node));
return Status::OK;
case TExprNodeType::INT_LITERAL:
@@ -520,12 +521,14 @@ Status Expr::Prepare(RuntimeState* state, const RowDescriptor& row_desc) {
PrepareChildren(state, row_desc);
// Not all exprs have opcodes (i.e. literals, agg-exprs)
DCHECK(opcode_ != TExprOpcode::INVALID_OPCODE);
compute_fn_ = OpcodeRegistry::Instance()->GetFunction(opcode_);
if (compute_fn_ == NULL) {
void* compute_fn_ptr =
OpcodeRegistry::Instance()->GetFunctionPtr(opcode_);
if (compute_fn_ptr == NULL) {
stringstream out;
out << "Expr::Prepare(): Opcode: " << opcode_ << " does not have a registry entry. ";
return Status(out.str());
}
compute_fn_ = reinterpret_cast<ComputeFn>(compute_fn_ptr);
return Status::OK;
}

View File

@@ -212,6 +212,17 @@ struct ExprValue {
};
// This is the superclass of all expr evaluation nodes.
//
// If codegen is enabled for the query, we will codegen as much of the expr evaluation
// as possible. This means all builtins will run through the codegen path and nothing
// (e.g. Exec nodes) will call GetValue(). Instead, they will call GetIrComputeFn().
//
// In order to call UDFs (external ones in a different binary/IR module), we need
// to use codegen to generate the wrapper to call the function, regardless of
// whether or not codegen is enabled for the query. If codegen is enabled for the
// query, the UDF will be wrapped and returned as an IR function in GetIrComputeFn().
// If codegen is disabled for the query, the UDF will be wrapped and jit compiled to
// a function that is called in GetValue().
class Expr {
public:
// typedef for compute functions.

View File

@@ -17,7 +17,9 @@
#include <vector>
#include <llvm/ExecutionEngine/ExecutionEngine.h>
#include "codegen/llvm-codegen.h"
#include "exprs/opcode-registry.h"
#include "exprs/udf-util.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/lib-cache.h"
#include "udf/udf-internal.h"
#include "util/debug-util.h"
@@ -204,10 +206,16 @@ Status NativeUdfExpr::GetIrComputeFn(RuntimeState* state, llvm::Function** fn) {
Status NativeUdfExpr::GetUdf(RuntimeState* state, llvm::Function** udf) {
LlvmCodeGen* codegen = state->llvm_codegen();
if (udf_type_ == TFunctionBinaryType::NATIVE) {
if (udf_type_ == TFunctionBinaryType::NATIVE ||
udf_type_ == TFunctionBinaryType::BUILTIN) {
void* udf_ptr;
RETURN_IF_ERROR(state->lib_cache()->GetFunctionPtr(
state, hdfs_location_, symbol_name_, &udf_ptr));
if (udf_type_ == TFunctionBinaryType::NATIVE) {
RETURN_IF_ERROR(state->lib_cache()->GetFunctionPtr(
state, hdfs_location_, symbol_name_, &udf_ptr));
} else {
udf_ptr = OpcodeRegistry::Instance()->GetFunctionPtr(opcode_);
}
DCHECK(udf_ptr != NULL);
// Convert UDF function pointer to llvm::Function*
// First generate the llvm::FunctionType* corresponding to the UDF.

View File

@@ -63,7 +63,7 @@ class NativeUdfExpr: public Expr {
// TODO: Get this from the to-be-implemented ExprContext instead
boost::scoped_ptr<impala_udf::FunctionContext> udf_context_;
// Native (.so) or IR (.ll)
// Native (.so), IR (.ll) or builtin
TFunctionBinaryType::type udf_type_;
// HDFS/local path and name of the compiled UDF binary

View File

@@ -30,9 +30,8 @@ class TupleRow;
class OpcodeRegistry {
public:
// Returns the function for this opcode. If the opcdoe is not valid,
// this function returns NULL
Expr::ComputeFn GetFunction(TExprOpcode::type opcode) {
// Returns the function ptr for this opcode.
void* GetFunctionPtr(TExprOpcode::type opcode) {
int index = static_cast<int>(opcode);
DCHECK_GE(index, 0);
DCHECK_LT(index, functions_.size());
@@ -61,20 +60,19 @@ class OpcodeRegistry {
}
// Populates all of the registered functions. Implemented in
// opcode-registry-init.cc which is an auto-generated file
// opcode-registry-init.cc which is an auto-generated file
void Init();
// Add a function to the registry.
void Add(TExprOpcode::type opcode, const Expr::ComputeFn& function) {
void Add(TExprOpcode::type opcode, void* fn) {
int index = static_cast<int>(opcode);
DCHECK_LT(index, functions_.size());
DCHECK_GE(index, 0);
functions_[index] = function;
functions_[index] = fn;
}
static OpcodeRegistry* instance_;
static boost::mutex instance_lock_;
std::vector<Expr::ComputeFn> functions_;
std::vector<void*> functions_;
};
}

View File

@@ -0,0 +1,44 @@
// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "exprs/udf-builtins.h"
#include <ctype.h>
#include <math.h>
using namespace std;
namespace impala {
DoubleVal UdfBuiltins::Abs(FunctionContext* context, const DoubleVal& v) {
if (v.is_null) return v;
return DoubleVal(fabs(v.val));
}
DoubleVal UdfBuiltins::Pi(FunctionContext* context) {
return DoubleVal(M_PI);
}
StringVal UdfBuiltins::Lower(FunctionContext* context, const StringVal& v) {
if (v.is_null) return v;
StringVal result(context, v.len);
for (int i = 0; i < v.len; ++i) {
result.ptr[i] = tolower(v.ptr[i]);
}
return result;
}
}

View File

@@ -0,0 +1,39 @@
// Copyright 2012 Cloudera Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef IMPALA_EXPRS_UDF_BUILTINS_H
#define IMPALA_EXPRS_UDF_BUILTINS_H
#include "udf/udf.h"
using namespace impala_udf;
namespace impala {
// Builtins written against the UDF interface. The builtins in the other files
// should be replaced to the UDF interface as well.
// This is just to illustrate how builtins against the UDF interface will be
// implemented.
class UdfBuiltins {
public:
static DoubleVal Abs(FunctionContext* context, const DoubleVal&);
static DoubleVal Pi(FunctionContext* context);
static StringVal Lower(FunctionContext* context, const StringVal&);
};
}
#endif

View File

@@ -106,6 +106,7 @@ cc_registry_preamble = '\
#include "exprs/string-functions.h"\n\
#include "exprs/timestamp-functions.h"\n\
#include "exprs/conditional-functions.h"\n\
#include "exprs/udf-builtins.h"\n\
#include "exprs/utility-functions.h"\n\
#include "opcode/functions.h"\n\
\n\
@@ -217,18 +218,16 @@ operators = []
meta_data_entries = {}
# Read in the function and add it to the meta_data_entries map
def add_function(fn_meta_data):
def add_function(fn_meta_data, udf_interface):
fn_name = fn_meta_data[0]
ret_type = fn_meta_data[1]
args = fn_meta_data[2]
be_fn = fn_meta_data[3]
entry = {}
entry["fn_name"] = fn_meta_data[0]
entry["fn_name"] = fn_name
entry["ret_type"] = fn_meta_data[1]
entry["args"] = fn_meta_data[2]
entry["be_fn"] = fn_meta_data[3]
entry["sql_names"] = fn_meta_data[4]
entry["udf_interface"] = udf_interface
if fn_name in meta_data_entries:
meta_data_entries[fn_name].append(entry)
@@ -273,7 +272,8 @@ def generate_be_registry_init(filename):
for entry in entries:
opcode = entry["opcode"]
be_fn = entry["be_fn"]
cc_registry_file.write(" this->Add(TExprOpcode::%s, %s);\n" % (opcode, be_fn))
cc_output = "TExprOpcode::%s, (void*)%s" % (opcode, be_fn)
cc_registry_file.write(" this->Add(%s);\n" % (cc_output))
cc_registry_file.write(cc_registry_epilogue)
cc_registry_file.close()
@@ -288,7 +288,12 @@ def generate_fe_registry_init(filename):
for fn in meta_data_entries:
entries = meta_data_entries[fn]
for entry in entries:
java_output = "FunctionOperator." + fn.upper()
java_output = ""
if entry["udf_interface"]:
java_output += "true"
else:
java_output += "false"
java_output += ", FunctionOperator." + fn.upper()
java_output += ", TExprOpcode." + entry["opcode"]
# Check the last entry for varargs indicator.
if entry["args"] and entry["args"][-1] == "...":
@@ -327,12 +332,17 @@ for function in impala_functions.functions:
if len(function) != 5:
print "Invalid function entry in impala_functions.py:\n\t" + repr(function)
sys.exit(1)
add_function(function)
add_function(function, False)
for function in impala_functions.udf_functions:
if len(function) != 5:
print "Invalid function entry in impala_functions.py:\n\t" + repr(function)
sys.exit(1)
add_function(function, True)
for function in generated_functions.functions:
if len(function) != 5:
print "Invalid function entry in generated_functions.py:\n\t" + repr(function)
sys.exit(1)
add_function(function)
add_function(function, False)
generate_opcodes()

View File

@@ -89,7 +89,7 @@ functions = [
'MathFunctions::QuotientDouble', ['quotient']],
['String_Substring', 'STRING', ['STRING', 'INT'], \
'StringFunctions::Substring<int32_t>', ['substr', 'substring']],
'StringFunctions::Substring<int32_t>', ['substr', 'substring']],
['String_Substring', 'STRING', ['STRING', 'BIGINT'], \
'StringFunctions::Substring<int64_t>', ['substr', 'substring']],
['String_Substring', 'STRING', ['STRING', 'INT', 'INT'], \
@@ -124,12 +124,12 @@ functions = [
['String_Upper', 'STRING', ['STRING'], 'StringFunctions::Upper', ['upper', 'ucase']],
['String_InitCap', 'STRING', ['STRING'], 'StringFunctions::InitCap', ['initcap']],
['String_Reverse', 'STRING', ['STRING'], 'StringFunctions::Reverse', ['reverse']],
['String_Translate', 'STRING', ['STRING', 'STRING', 'STRING'],
['String_Translate', 'STRING', ['STRING', 'STRING', 'STRING'],
'StringFunctions::Translate', ['translate']],
['String_Trim', 'STRING', ['STRING'], 'StringFunctions::Trim', ['trim']],
['String_Ltrim', 'STRING', ['STRING'], 'StringFunctions::Ltrim', ['ltrim']],
['String_Rtrim', 'STRING', ['STRING'], 'StringFunctions::Rtrim', ['rtrim']],
['String_Ascii', 'INT', ['STRING'], 'StringFunctions::Ascii', ['ascii']],
['String_Ascii', 'INT', ['STRING'], 'StringFunctions::Ascii', ['ascii']],
['String_Instr', 'INT', ['STRING', 'STRING'], 'StringFunctions::Instr', ['instr']],
['String_Locate', 'INT', ['STRING', 'STRING'], 'StringFunctions::Locate', ['locate']],
['String_Locate', 'INT', ['STRING', 'STRING', 'INT'], \
@@ -326,7 +326,7 @@ functions = [
'ConditionalFunctions::IsNull', ['isnull', 'ifnull', 'nvl']],
['Conditional_IsNull', 'TIMESTAMP', ['TIMESTAMP', 'TIMESTAMP'], \
'ConditionalFunctions::IsNull', ['isnull', 'ifnull', 'nvl']],
['Conditional_Coalesce', 'BOOLEAN', ['BOOLEAN', '...'], \
'ConditionalFunctions::CoalesceBool', ['coalesce']],
['Conditional_Coalesce', 'BIGINT', ['BIGINT', '...'], \
@@ -338,3 +338,11 @@ functions = [
['Conditional_Coalesce', 'TIMESTAMP', ['TIMESTAMP', '...'], \
'ConditionalFunctions::CoalesceTimestamp', ['coalesce']],
]
# These functions are implemented against the UDF interface.
# TODO: this list should subsume the one above when all builtins are migrated.
udf_functions = [
['Udf_Math_Pi', 'DOUBLE', [], 'UdfBuiltins::Pi', ['udf_pi']],
['Udf_Math_Abs', 'DOUBLE', ['DOUBLE'], 'UdfBuiltins::Abs', ['udf_abs']],
['Udf_String_Lower', 'STRING', ['STRING'], 'UdfBuiltins::Lower', ['udf_lower']],
]

View File

@@ -84,6 +84,10 @@ enum TFunctionType {
}
enum TFunctionBinaryType {
// Impala builtin. We can either run this interpreted or via codegen
// depending on the query option.
BUILTIN,
// Hive UDFs, loaded from *.jar
HIVE,

View File

@@ -23,14 +23,16 @@ import com.cloudera.impala.common.AnalysisException;
import com.cloudera.impala.opcode.FunctionOperator;
import com.cloudera.impala.thrift.TExprNode;
import com.cloudera.impala.thrift.TExprNodeType;
import com.cloudera.impala.thrift.TFunctionBinaryType;
import com.cloudera.impala.thrift.TUdfCallExpr;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
public class FunctionCallExpr extends Expr {
private final FunctionName functionName_;
// The udf function if this function call is for a UDF.
private Udf udf_;
// The function to call. This can either be a builtin or a UDF.
private Function fn_;
public FunctionCallExpr(FunctionName functionName, List<Expr> params) {
super();
@@ -57,15 +59,28 @@ public class FunctionCallExpr extends Expr {
@Override
protected void toThrift(TExprNode msg) {
if (udf_ != null) {
if (fn_ instanceof Udf) {
Udf udf = (Udf)fn_;
msg.node_type = TExprNodeType.UDF_CALL;
msg.setUdf_call_expr(new TUdfCallExpr());
msg.udf_call_expr.setBinary_location(udf_.getLocation().toString());
msg.udf_call_expr.setSymbol_name(udf_.getSymbolName());
msg.udf_call_expr.setBinary_type(udf_.getBinaryType());
msg.udf_call_expr.setHas_var_args(udf_.getHasVarArgs());
msg.udf_call_expr.setBinary_location(udf.getLocation().toString());
msg.udf_call_expr.setSymbol_name(udf.getSymbolName());
msg.udf_call_expr.setBinary_type(udf.getBinaryType());
msg.udf_call_expr.setHas_var_args(udf.getHasVarArgs());
} else {
msg.node_type = TExprNodeType.FUNCTION_CALL;
Preconditions.checkState(fn_ instanceof OpcodeRegistry.BuiltinFunction);
OpcodeRegistry.BuiltinFunction builtin = (OpcodeRegistry.BuiltinFunction)fn_;
if (builtin.udfInterface) {
msg.node_type = TExprNodeType.UDF_CALL;
msg.setUdf_call_expr(new TUdfCallExpr());
msg.udf_call_expr.setBinary_location("");
msg.udf_call_expr.setSymbol_name(functionName_.getFunction());
msg.udf_call_expr.setHas_var_args(builtin.getHasVarArgs());
msg.udf_call_expr.setBinary_type(TFunctionBinaryType.BUILTIN);
} else {
// TODO: remove. All builtins will go through UDF_CALL.
msg.node_type = TExprNodeType.FUNCTION_CALL;
}
msg.setOpcode(opcode);
}
}
@@ -78,8 +93,6 @@ public class FunctionCallExpr extends Expr {
argTypes[i] = this.children.get(i).getType();
}
Function fnDesc = null;
// First check if this is a builtin
FunctionOperator op = OpcodeRegistry.instance().getFunctionOperator(
functionName_.getFunction());
@@ -88,7 +101,7 @@ public class FunctionCallExpr extends Expr {
OpcodeRegistry.instance().getFunctionInfo(op, true, argTypes);
if (match != null) {
this.opcode = match.opcode;
fnDesc = match;
fn_ = match;
}
} else {
// Next check if it is a UDF
@@ -106,16 +119,15 @@ public class FunctionCallExpr extends Expr {
searchDesc, Function.CompareMode.IS_SUBTYPE);
if (fn != null) {
if (fn instanceof Udf) {
udf_ = (Udf)fn;
fnDesc = fn;
fn_ = fn;
} else {
throw new AnalysisException(functionName_ + "() is not a UDF");
}
}
}
if (fnDesc != null) {
PrimitiveType[] args = fnDesc.getArgs();
if (fn_ != null) {
PrimitiveType[] args = fn_.getArgs();
// Implicitly cast all the children to match the function if necessary
for (int i = 0; i < argTypes.length; ++i) {
// For varargs, we must compare with the last type in callArgs.argTypes.
@@ -124,7 +136,7 @@ public class FunctionCallExpr extends Expr {
castChild(args[ix], i);
}
}
this.type = fnDesc.getReturnType();
this.type = fn_.getReturnType();
} else {
String error = String.format("No matching function with signature: %s(%s).",
functionName_, Joiner.on(", ").join(argTypes));

View File

@@ -57,19 +57,24 @@ public class OpcodeRegistry {
public static class BuiltinFunction extends Function {
public TExprOpcode opcode;
public FunctionOperator operator;
// If true, this builtin is implemented against the Udf interface.
public final boolean udfInterface;
// Constructor for searching, specifying the op and arguments
public BuiltinFunction(FunctionOperator operator, PrimitiveType[] args) {
super(new FunctionName(operator.toString()),
args, PrimitiveType.INVALID_TYPE, false);
this.operator = operator;
this.udfInterface = false;
}
private BuiltinFunction(TExprOpcode opcode, FunctionOperator operator,
boolean varArgs, PrimitiveType ret, PrimitiveType[] args) {
private BuiltinFunction(boolean udfInterface, TExprOpcode opcode,
FunctionOperator operator, boolean varArgs, PrimitiveType ret,
PrimitiveType[] args) {
super(new FunctionName(opcode.toString()), args, ret, varArgs);
this.operator = operator;
this.opcode = opcode;
this.udfInterface = udfInterface;
}
}
@@ -191,8 +196,8 @@ public class OpcodeRegistry {
/**
* Add a function with the specified opcode/signature to the registry.
*/
public boolean add(FunctionOperator op, TExprOpcode opcode, boolean varArgs,
PrimitiveType retType, PrimitiveType ... args) {
public boolean add(boolean udfInterface, FunctionOperator op, TExprOpcode opcode,
boolean varArgs, PrimitiveType retType, PrimitiveType ... args) {
List<BuiltinFunction> functions;
Pair<FunctionOperator, Integer> lookup = Pair.create(op, args.length);
// Take the last argument's type as the vararg type.
@@ -218,7 +223,8 @@ public class OpcodeRegistry {
}
}
BuiltinFunction function = new BuiltinFunction(opcode, op, varArgs, retType, args);
BuiltinFunction function =
new BuiltinFunction(udfInterface, opcode, op, varArgs, retType, args);
if (functions.contains(function)) {
LOG.error("OpcodeRegistry: Function already exists: " + opcode);
return false;