mirror of
https://github.com/apache/impala.git
synced 2025-12-25 02:03:09 -05:00
IMPALA-11549: Support Hive GenericUdfs that return primitive java types
Before this patch only the Writable* types were accepted in GenericUdfs as return types, while some GenericUdfs in the wild return primitive java types (e.g. Integer instead of IntWritable). For legacy Hive UDFs these return types were already handled, so the only change needed was to map the ObjectInspector subclasses (e.g. JavaIntObjectInspector) to the correct JavaUdfDataType in Impala. Testing: - Added a subclass for TestGenericUdf (TestGenericUdfWithJavaReturnTypes) that returns primitive java types (probably inheriting in the opposite direction would be more logical, but the diff is smaller this way). - Changed EE tests to also use TestGenericUdfWithJavaReturnTypes. - Changed FE tests (UdfExecutorTest) to check both TestGenericUdfWithJavaReturnTypes and TestGenericUdf. - Also added a test with BINARY type to UdfExecutorTest as this was forgotten during the original BINARY patch. Change-Id: I30679045d6693ebd35718b6f1a22aaa4963c1e63 Reviewed-on: http://gerrit.cloudera.org:8080/19304 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
committed by
Impala Public Jenkins
parent
a469a9cf19
commit
86740a7d35
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
|
||||
import org.apache.hadoop.io.BooleanWritable;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.FloatWritable;
|
||||
@@ -50,9 +51,14 @@ import java.util.Set;
|
||||
* Simple Generic UDFs for testing.
|
||||
*
|
||||
* Udf that takes a variable number of arguments of the same type and applies
|
||||
* the "+" operator to them. The "+" is a concatenation for string types. For
|
||||
* boolean types, it applies the OR operation. If only one argument is provided,
|
||||
* it returns that argument.
|
||||
* the "+" operator to them. The "+" is a concatenation for string and binary types.
|
||||
* For boolean types, it applies the OR operation. If only one argument is provided,
|
||||
* it returns that argument. If any argument is NULL, it returns NULL.
|
||||
*
|
||||
* For all argument types the return type is Writable class (e.g. IntWritable).
|
||||
* Generic UDfs can also return Java primitive classes (e.g. Integer). A separate
|
||||
* UDF class (TestGenericUdfWithJavaReturnTypes) is created with similar behavior
|
||||
* as this but different return types.
|
||||
*
|
||||
* This class is a copy of the TestGenericUdf class in the FE. We need this class in a
|
||||
* separate project so we can test loading UDF jars that are not already on the
|
||||
@@ -66,7 +72,7 @@ import java.util.Set;
|
||||
*/
|
||||
public class TestGenericUdf extends GenericUDF {
|
||||
|
||||
private List<PrimitiveCategory> inputTypes_ = new ArrayList<>();
|
||||
private List<PrimitiveCategory> inputTypes_;
|
||||
private PrimitiveObjectInspector retTypeOI_;
|
||||
private PrimitiveCategory argAndRetType_;
|
||||
|
||||
@@ -94,6 +100,8 @@ public class TestGenericUdf extends GenericUDF {
|
||||
throw new UDFArgumentException("No arguments provided.");
|
||||
}
|
||||
|
||||
// Resetting here as initialize can be called more than once by Hive.
|
||||
inputTypes_ = new ArrayList<>();
|
||||
for (ObjectInspector oi : arguments) {
|
||||
if (!(oi instanceof PrimitiveObjectInspector)) {
|
||||
throw new UDFArgumentException("Found an input that is not a primitive.");
|
||||
@@ -102,8 +110,8 @@ public class TestGenericUdf extends GenericUDF {
|
||||
inputTypes_.add(poi.getPrimitiveCategory());
|
||||
}
|
||||
|
||||
// return type is always same as last argument
|
||||
retTypeOI_ = (PrimitiveObjectInspector) arguments[0];
|
||||
// return type is always same as first argument
|
||||
retTypeOI_ = getReturnObjectInspector((PrimitiveObjectInspector) arguments[0]);
|
||||
|
||||
argAndRetType_ = retTypeOI_.getPrimitiveCategory();
|
||||
|
||||
@@ -111,32 +119,40 @@ public class TestGenericUdf extends GenericUDF {
|
||||
return retTypeOI_;
|
||||
}
|
||||
|
||||
protected PrimitiveObjectInspector getReturnObjectInspector(
|
||||
PrimitiveObjectInspector oi) {
|
||||
// Simply returns the same object inspector. Subclasses can override this to return
|
||||
// different types of object inspectors.
|
||||
return oi;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object evaluate(DeferredObject[] arguments)
|
||||
throws HiveException {
|
||||
if (arguments.length != inputTypes_.size()) {
|
||||
throw new HiveException("Number of arguments passed in did not match number of " +
|
||||
"arguments expected.");
|
||||
"arguments expected. Expected: "
|
||||
+ inputTypes_.size() + " actual: " + arguments.length);
|
||||
}
|
||||
switch (argAndRetType_) {
|
||||
case BOOLEAN:
|
||||
return evaluateBoolean(arguments);
|
||||
return evaluateBooleanWrapped(arguments);
|
||||
case BYTE:
|
||||
return evaluateByte(arguments);
|
||||
return evaluateByteWrapped(arguments);
|
||||
case SHORT:
|
||||
return evaluateShort(arguments);
|
||||
return evaluateShortWrapped(arguments);
|
||||
case INT:
|
||||
return evaluateInt(arguments);
|
||||
return evaluateIntWrapped(arguments);
|
||||
case LONG:
|
||||
return evaluateLong(arguments);
|
||||
return evaluateLongWrapped(arguments);
|
||||
case FLOAT:
|
||||
return evaluateFloat(arguments);
|
||||
return evaluateFloatWrapped(arguments);
|
||||
case DOUBLE:
|
||||
return evaluateDouble(arguments);
|
||||
return evaluateDoubleWrapped(arguments);
|
||||
case STRING:
|
||||
return evaluateString(arguments);
|
||||
return evaluateStringWrapped(arguments);
|
||||
case BINARY:
|
||||
return evaluateBinary(arguments);
|
||||
return evaluateBinaryWrapped(arguments);
|
||||
case DATE:
|
||||
case TIMESTAMP:
|
||||
default:
|
||||
@@ -164,8 +180,7 @@ public class TestGenericUdf extends GenericUDF {
|
||||
}
|
||||
}
|
||||
|
||||
public BooleanWritable evaluateBoolean(DeferredObject[] inputs) throws HiveException {
|
||||
List<BooleanWritable> booleanInputs = new ArrayList<>();
|
||||
protected Boolean evaluateBoolean(DeferredObject[] inputs) throws HiveException {
|
||||
boolean finalBoolean = false;
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -177,13 +192,10 @@ public class TestGenericUdf extends GenericUDF {
|
||||
boolean currentBool = ((BooleanWritable) input.get()).get();
|
||||
finalBoolean |= currentBool;
|
||||
}
|
||||
BooleanWritable resultBool = new BooleanWritable();
|
||||
resultBool.set(finalBoolean);
|
||||
return resultBool;
|
||||
return finalBoolean;
|
||||
}
|
||||
|
||||
public ByteWritable evaluateByte(DeferredObject[] inputs) throws HiveException {
|
||||
List<ByteWritable> byteInputs = new ArrayList<>();
|
||||
protected Byte evaluateByte(DeferredObject[] inputs) throws HiveException {
|
||||
byte finalByte = 0;
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -195,13 +207,10 @@ public class TestGenericUdf extends GenericUDF {
|
||||
byte currentByte = ((ByteWritable) input.get()).get();
|
||||
finalByte += currentByte;
|
||||
}
|
||||
ByteWritable resultByte = new ByteWritable();
|
||||
resultByte.set(finalByte);
|
||||
return resultByte;
|
||||
return finalByte;
|
||||
}
|
||||
|
||||
public ShortWritable evaluateShort(DeferredObject[] inputs) throws HiveException {
|
||||
List<ShortWritable> shortInputs = new ArrayList<>();
|
||||
protected Short evaluateShort(DeferredObject[] inputs) throws HiveException {
|
||||
short finalShort = 0;
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -213,13 +222,10 @@ public class TestGenericUdf extends GenericUDF {
|
||||
short currentShort = ((ShortWritable) input.get()).get();
|
||||
finalShort += currentShort;
|
||||
}
|
||||
ShortWritable resultShort = new ShortWritable();
|
||||
resultShort.set(finalShort);
|
||||
return resultShort;
|
||||
return finalShort;
|
||||
}
|
||||
|
||||
public IntWritable evaluateInt(DeferredObject[] inputs) throws HiveException {
|
||||
List<IntWritable> intInputs = new ArrayList<>();
|
||||
protected Integer evaluateInt(DeferredObject[] inputs) throws HiveException {
|
||||
int finalInt = 0;
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -231,13 +237,10 @@ public class TestGenericUdf extends GenericUDF {
|
||||
int currentInt = ((IntWritable) input.get()).get();
|
||||
finalInt += currentInt;
|
||||
}
|
||||
IntWritable resultInt = new IntWritable();
|
||||
resultInt.set(finalInt);
|
||||
return resultInt;
|
||||
return finalInt;
|
||||
}
|
||||
|
||||
public LongWritable evaluateLong(DeferredObject[] inputs) throws HiveException {
|
||||
List<LongWritable> longInputs = new ArrayList<>();
|
||||
protected Long evaluateLong(DeferredObject[] inputs) throws HiveException {
|
||||
long finalLong = 0;
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -249,13 +252,10 @@ public class TestGenericUdf extends GenericUDF {
|
||||
long currentLong = ((LongWritable) input.get()).get();
|
||||
finalLong += currentLong;
|
||||
}
|
||||
LongWritable resultLong = new LongWritable();
|
||||
resultLong.set(finalLong);
|
||||
return resultLong;
|
||||
return finalLong;
|
||||
}
|
||||
|
||||
public FloatWritable evaluateFloat(DeferredObject[] inputs) throws HiveException {
|
||||
List<FloatWritable> floatInputs = new ArrayList<>();
|
||||
protected Float evaluateFloat(DeferredObject[] inputs) throws HiveException {
|
||||
float finalFloat = 0.0F;
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -267,13 +267,10 @@ public class TestGenericUdf extends GenericUDF {
|
||||
float currentFloat = ((FloatWritable) input.get()).get();
|
||||
finalFloat += currentFloat;
|
||||
}
|
||||
FloatWritable resultFloat = new FloatWritable();
|
||||
resultFloat.set(finalFloat);
|
||||
return resultFloat;
|
||||
return finalFloat;
|
||||
}
|
||||
|
||||
public DoubleWritable evaluateDouble(DeferredObject[] inputs) throws HiveException {
|
||||
List<DoubleWritable> doubleInputs = new ArrayList<>();
|
||||
protected Double evaluateDouble(DeferredObject[] inputs) throws HiveException {
|
||||
double finalDouble = 0.0;
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -285,13 +282,10 @@ public class TestGenericUdf extends GenericUDF {
|
||||
double currentDouble = ((DoubleWritable) input.get()).get();
|
||||
finalDouble += currentDouble;
|
||||
}
|
||||
DoubleWritable resultDouble = new DoubleWritable();
|
||||
resultDouble.set(finalDouble);
|
||||
return resultDouble;
|
||||
return finalDouble;
|
||||
}
|
||||
|
||||
public Text evaluateString(DeferredObject[] inputs) throws HiveException {
|
||||
List<String> stringInputs = new ArrayList<>();
|
||||
protected String evaluateString(DeferredObject[] inputs) throws HiveException {
|
||||
String finalString = "";
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
@@ -303,13 +297,12 @@ public class TestGenericUdf extends GenericUDF {
|
||||
String currentString = ((Text) input.get()).toString();
|
||||
finalString += currentString;
|
||||
}
|
||||
Text resultString = new Text();
|
||||
resultString.set(finalString);
|
||||
return resultString;
|
||||
return finalString;
|
||||
}
|
||||
|
||||
public BytesWritable evaluateBinary(DeferredObject[] inputs) throws HiveException {
|
||||
byte[] result = null;
|
||||
protected byte[] evaluateBinary(DeferredObject[] inputs) throws HiveException {
|
||||
int resultLength = 0;
|
||||
|
||||
for (DeferredObject input : inputs) {
|
||||
if (input == null) {
|
||||
return null;
|
||||
@@ -319,15 +312,87 @@ public class TestGenericUdf extends GenericUDF {
|
||||
"Expected BytesWritable but got " + input.get().getClass());
|
||||
}
|
||||
byte[] currentArray = ((BytesWritable) input.get()).getBytes();
|
||||
// Unlike other functions, simply return last argument.
|
||||
result = currentArray;
|
||||
resultLength += currentArray.length;
|
||||
}
|
||||
int pos = 0;
|
||||
byte[] result = new byte[resultLength];
|
||||
for (DeferredObject input : inputs) {
|
||||
byte[] currentArray = ((BytesWritable) input.get()).getBytes();
|
||||
System.arraycopy(
|
||||
currentArray, 0, result, pos, currentArray.length);
|
||||
pos += currentArray.length;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// The evaluate*Wrapped functions below get the result from evaluate*
|
||||
// and wrap in a Writable* class.
|
||||
|
||||
protected Object evaluateBooleanWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
BooleanWritable resultBool = new BooleanWritable();
|
||||
resultBool.set(evaluateBoolean(inputs));
|
||||
return resultBool;
|
||||
}
|
||||
|
||||
protected Object evaluateByteWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
ByteWritable resultByte = new ByteWritable();
|
||||
resultByte.set(evaluateByte(inputs));
|
||||
return resultByte;
|
||||
}
|
||||
|
||||
protected Object evaluateShortWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
ShortWritable resultShort = new ShortWritable();
|
||||
resultShort.set(evaluateShort(inputs));
|
||||
return resultShort;
|
||||
}
|
||||
|
||||
protected Object evaluateIntWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
IntWritable resultInt = new IntWritable();
|
||||
resultInt.set(evaluateInt(inputs));
|
||||
return resultInt;
|
||||
}
|
||||
|
||||
protected Object evaluateLongWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
LongWritable resultLong = new LongWritable();
|
||||
resultLong.set(evaluateLong(inputs));
|
||||
return resultLong;
|
||||
}
|
||||
|
||||
protected Object evaluateFloatWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
FloatWritable resultFloat = new FloatWritable();
|
||||
resultFloat.set(evaluateFloat(inputs));
|
||||
return resultFloat;
|
||||
}
|
||||
|
||||
protected Object evaluateDoubleWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
DoubleWritable resultDouble = new DoubleWritable();
|
||||
resultDouble.set(evaluateDouble(inputs));
|
||||
return resultDouble;
|
||||
}
|
||||
|
||||
protected Object evaluateStringWrapped(DeferredObject[] inputs) throws HiveException {
|
||||
Text resultString = new Text();
|
||||
resultString.set(evaluateString(inputs));
|
||||
return resultString;
|
||||
}
|
||||
|
||||
protected Object evaluateBinaryWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
byte[] result = evaluateBinary(inputs);
|
||||
if (result == null) return null;
|
||||
BytesWritable resultBinary = new BytesWritable();
|
||||
if (result != null) resultBinary.set(result, 0, result.length);
|
||||
resultBinary.set(result, 0, result.length);
|
||||
return resultBinary;
|
||||
}
|
||||
|
||||
private String getSignatureString(PrimitiveCategory argAndRetType_,
|
||||
protected String getSignatureString(PrimitiveCategory argAndRetType_,
|
||||
List<PrimitiveCategory> inputTypes_) {
|
||||
return argAndRetType_ + "TestGenericUdf(" + Joiner.on(",").join(inputTypes_) + ")";
|
||||
}
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.impala;
|
||||
|
||||
import org.apache.hadoop.hive.ql.metadata.HiveException;
|
||||
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
|
||||
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
|
||||
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector;
|
||||
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Simple Generic UDFs for testing.
|
||||
*
|
||||
* This class overrides a few methods in TestGenericUdf to return primitive
|
||||
* Java types instead of Hive's writable classes. Otherwise this class behaves
|
||||
* exactly the same way as TestGenericUdf. See TestGenericUdf for more information.
|
||||
*
|
||||
* Similarly to TestGenericUdf this class also has copy in the FE.
|
||||
*
|
||||
*/
|
||||
public class TestGenericUdfWithJavaReturnTypes extends TestGenericUdf {
|
||||
|
||||
public TestGenericUdfWithJavaReturnTypes() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PrimitiveObjectInspector getReturnObjectInspector(
|
||||
PrimitiveObjectInspector oi) {
|
||||
PrimitiveTypeInfo typeInfo = oi.getTypeInfo();
|
||||
return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(typeInfo);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getDisplayString(String[] children) {
|
||||
return "TestGenericUdfWithJavaReturnTypes";
|
||||
}
|
||||
|
||||
// The evaluate*Wrapped functions below simply return the results of
|
||||
// evaluate*.
|
||||
|
||||
@Override
|
||||
protected Object evaluateBooleanWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateBoolean(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateByteWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateByte(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateShortWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateShort(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateIntWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateInt(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateLongWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateLong(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateFloatWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateFloat(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateDoubleWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateDouble(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateStringWrapped(DeferredObject[] inputs) throws HiveException {
|
||||
return evaluateString(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object evaluateBinaryWrapped(DeferredObject[] inputs)
|
||||
throws HiveException {
|
||||
return evaluateBinary(inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getSignatureString(PrimitiveCategory argAndRetType_,
|
||||
List<PrimitiveCategory> inputTypes_) {
|
||||
return argAndRetType_ + "TestGenericUdfWithJavaReturnTypes(" +
|
||||
Joiner.on(",").join(inputTypes_) + ")";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user