External Function 开发指南(Java)
本篇文档将介绍如何使用Java语言开发UDF、UDAF、UDTF类型的外部函数。
注:External Function当前处于预览阶段,需要使用文档中说明的控制参数进行启用。
UDF
云器Lakehouse UDF 支持基于使用Hive UDF API 规范开发函数,支持使用 GenericUDF (org.apache.hadoop.hive.ql.udf.generic.GenericUDF) 和 UDF (org.apache.hadoop.hive.ql.exec.UDF)开发标量函数。
开发UDF
创建MAVEN项目,在pom.xml添加依赖。
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.3.8</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
编写UDF代码,示例如下:
package com.example;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.Locale;
public class GenericUdfUpper extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
checkArgsSize(arguments, 1, 1);
checkArgPrimitive(arguments, 0);
if (((PrimitiveObjectInspector) arguments[0]).getPrimitiveCategory() != PrimitiveCategory.STRING) {
throw new UDFArgumentException("argument 0 requires to be string rather than " + arguments[0].getTypeName());
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
Object arg = arguments[0].get();
if (arg == null) {
return null;
}
return ((String) arg).toUpperCase(Locale.ROOT);
}
@Override
public String getDisplayString(String[] children) {
return "upper";
}
}
打包项目为jar文件。
上传JAR文件到Volume中
将打包项目好的jar文件上传至云器Lakehouse中创建的External Volume中。
首先,创建CONNECTION对象用于连接您已有的对象存储地址。
--创建指向对象存储的服务连接定义
CREATE OR REPLACE STORAGE CONNECTION qn_hz_bucket_ramrole
TYPE oss
REGION = 'cn-hangzhou'
ROLE_ARN = 'acs:ram::1875653xxxxx:role/czudfrole'
ENDPOINT = 'oss-cn-hangzhou-internal.aliyuncs.com';
其次,创建EXTERNAL VOLUME对象MOUNT对象存储的指定路径。
--创建External Volume
CREATE EXTERNAL VOLUME qn_hz_bucket_vol
location 'oss://qn-hz-bucket/'
using connection qn_hz_bucket_ramrole
directory = (
enable=true,
auto_refresh=false
)
recursive=true;
最后使用本地JDBC客户端,连接工作空间并在本地客户端工作使用PUT命令上传jar文件(注:Studio Web-UI的SQL编辑器中不支持使用PUT命令上传本地文件)。
--上传打包后的UDF JAR
PUT '/Users/Downloads/upper.jar' TO VOLUME qn_hz_bucket_vol FILE 'upper.jar';
--查看上传的文件
SHOW VOLUME DIRECTORY qn_hz_bucket_vol;
relative_path url size last_modified_time
----------------------------------------------- ------------------------------------------------------------------ ---- -------------------
data_parquet/data.csv oss://qn-hz-bucket/data_parquet/data.csv 34 2024-05-29 17:03:25
data_parquet/lakehouse_region_part00001.parquet oss://qn-hz-bucket/data_parquet/lakehouse_region_part00001.parquet 2472 2024-05-24 00:39:08
upper.jar oss://qn-hz-bucket/upper.jar 3161 2024-05-29 23:11:49
您也可以指定内部volume。虽然您可以使用内部volume但是您在创建API CONNECTION中的code bucket参数必须填写外部地址。
- User Volume 格式地址:
volume:user://~/upper.jar
- Table Volume 格式地址
volume:table://table_name/upper.jar
table
表示使用 Table Volume 协议。
table_name
表示表名,需根据实际情况填写。
upper.jar
表示目标文件名。
创建External Function
首先,创建函数计算服务Connection对象。
create api connection qn_hz_fc_connection
type cloud_function
with properties (
'cloud_function.provider' = 'aliyun',
'cloud_function.region' = 'cn-hangzhou',
'cloud_function.role_arn' = 'acs:ram::1875653611111111:role/czudfrole',
'cloud_function.namespace' = 'default',
'cloud_function.code_bucket' = 'qn-hz-bucket'
);
其次,创建External Function,使用前面定义的Volume对象以读取JAR文件,使用已经定义的函数计算连接的CONNECTION对象用于调用函数计算服务创建一一对应的函数。
create external function public.upper_udf
as 'com.example.GenericUdfUpper'
USING FILE 'volume://qn_hz_bucket_vol/upper.jar'
connection qn_hz_fc_connection
with properties (
'remote.udf.api' = 'java8.hive2.v0',
'remote.udf.protocol' = 'http.arrow.v0'
);
测试运行
借助测试数据或表数据测试运行UDF。注:当前需要通过cz.sql.remote.udf.enabled参数开启远程函数访问。
--测试运行UDF
set cz.sql.remote.udf.enabled = true;
select public.upper_udf('hello') as upper;
select public.upper_udf(product_id) from product_grossing limit 50;
具备阿里云控制台访问权限的用户,此时可以登录到阿里云函数计算控制台查看到CREATE EXTERNAL FUNCTION命令执行成功后,云器Lakehouse将自动创建函数以执行自定义函数。
执行DROP FUNCTION public.upper_udf;命令删除函数的同时,Lakehouse平台将同步删除云服务商的对应函数。
UDAF
支持基于 Hive 2.x UDAF 规范开发函数, 可使用 GenericUDAFResolver 和 GenericUDAFEvaluator 开发 UDAF;
UDAF 函数运行环境:
-
Java: 1.8 版本 (JDK 发行版由云厂商的函数计算服务运行环境提供);
-
部分基础库:
- org.apache.hive:hive-exec:2.3.4
- org.apache.arrow:arrow-vector:11.0.0
- org.apache.arrow:arrow-memory-netty:11.0.0
UDAF 开发
基于 GenericUDAFResolver 和 GenericUDAFEvalutor 编写 UDAF代码。
package com.example;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
public class sumint extends AbstractGenericUDAFResolver {
@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
if (info.length != 1) {
throw new UDFArgumentTypeException(info.length - 1, "Exactly one argument is expected.");
}
if (info[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0, "Only primitive type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
}
if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.STRING) {
return new SumStringEvaluator();
} else if (((PrimitiveTypeInfo)info[0]).getPrimitiveCategory() == PrimitiveCategory.INT) {
return new SumIntEvaluator();
} else {
throw new UDFArgumentTypeException(0, "Only string, int type arguments are accepted but " + info[0].getTypeName() + " was passed as parameter 1.");
}
}
public static class SumStringEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector inputOI;
static class SumAggregationBuffer implements AggregationBuffer {
int sum;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
inputOI = (PrimitiveObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumAggregationBuffer sum = new SumAggregationBuffer();
reset(sum);
return sum;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggregationBuffer) agg).sum = 0;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
if(parameters.length != 0 && inputOI.getPrimitiveJavaObject(parameters[0]) != null) {
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(parameters[0]).toString());
}
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
((SumAggregationBuffer) agg).sum += Integer.parseInt(inputOI.getPrimitiveJavaObject(partial).toString());
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
}
public static class SumIntEvaluator extends GenericUDAFEvaluator {
private IntObjectInspector inputOI;
static class SumAggregationBuffer implements AggregationBuffer {
int sum;
}
@Override
public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
super.init(m, parameters);
inputOI = (IntObjectInspector) parameters[0];
return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}
@Override
public AggregationBuffer getNewAggregationBuffer() throws HiveException {
SumAggregationBuffer sum = new SumAggregationBuffer();
reset(sum);
return sum;
}
@Override
public void reset(AggregationBuffer agg) throws HiveException {
((SumAggregationBuffer) agg).sum = 0;
}
@Override
public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
((SumAggregationBuffer) agg).sum += inputOI.get(parameters[0]);
}
@Override
public Object terminatePartial(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
@Override
public void merge(AggregationBuffer agg, Object partial) throws HiveException {
((SumAggregationBuffer) agg).sum += inputOI.get(partial);
}
@Override
public Object terminate(AggregationBuffer agg) throws HiveException {
return ((SumAggregationBuffer) agg).sum;
}
}
}
上传JAR文件到Volume中
编译打包为 jar, 上传到用户指定的对象存储位置或Lakehouse Volume对象中;
--上传打包后的UDF JAR
PUT '/Users/Downloads/sumint.jar' TO VOLUME qn_hz_bucket_vol FILE 'sumint.jar';
--查看上传的文件
SHOW VOLUME DIRECTORY qn_hz_bucket_vol;
relative_path url size last_modified_time
------------- ----------------------------- ---- -------------------
upper.jar oss://qn-hz-bucket/upper.jar 3161 2024-05-29 23:11:49
sumint.jar oss://qn-hz-bucket/sumint.jar 1022 2024-05-30 01:10:28
创建External Function
- 创建与函数计算服务连接的Connecton对象(参见UDF中的介绍)
- 在 LakeHouse 系统中创建外部函数
UDAF的 External Function创建语法说明:
CREATE EXTERNAL FUNCTION public.<funcName>
AS '<className>'
USING FILE 'oss://<bucket>/<pathToJar>'
CONNECTION <connectionName>
WITH PROPERTIES (
'remote.udf.api' = 'java8.hive2.v0',
'remote.udf.category' = 'AGGREGATOR');
参数说明:
- functionName: 可使用任意合法标识符, 比如 my_agg
- className: 填写第 1 步中开发的 GenericUDAFResolver 的完整类名, 比如 com.example.GenericUDAFSum;
- bucket 和 pathToJar: 填写第 2 步上传到 OSS 的存储桶和对象路径;
- connectionName: 使用第 3 步创建的 connection 的名字, 比如 udf_deploy_0317;
- 最后两个 PROPERTIES 保持原样即可;
示例如下:
--创建External Function
create external function public.sumint
as 'com.example.sumint'
USING FILE 'volume://qn_hz_bucket_vol/sumint.jar'
connection qn_hz_fc_connection
with properties (
'remote.udf.api' = 'java8.hive2.v0',
'remote.udf.category' = 'AGGREGATOR'
);
测试运行
借助测试数据或表数据测试运行UDF。注:当前需要通过cz.sql.remote.udf.enabled参数开启远程函数访问。
--测试运行UDF
set cz.sql.remote.udf.enabled = true;
select public.sumint(amount) from product_grossing;
UDTF
UDTF 开发
支持继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF进行UDTF开发。UDTF需要实现initialize, process, close三个方法。UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。初始化完成后,会调用process方法,对传入的参数进行处理,可以通过forword()方法把结果返回。最后close()方法调用,对需要清理的方法进行清理。
编写UDTF代码,示例如下:
package com.example;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class MyExplode extends GenericUDTF {
private static Logger logger = LoggerFactory.getLogger(MyExplode.class);
private ObjectInspector oi;
private Object[] params;
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
oi = argOIs[0];
final ObjectInspector.Category category = oi.getCategory();
List<String> names = new ArrayList<>(2);
List<ObjectInspector> types = new ArrayList<>(2);
switch (category){
case MAP:
logger.info("receive explode category : Map");
names.add("key");
names.add("value");
final MapObjectInspector moi = (MapObjectInspector) this.oi;
types.add(moi.getMapKeyObjectInspector());
types.add(moi.getMapValueObjectInspector());
params = new Object[2];
break;
case LIST:
logger.info("receive explode category : List");
names.add("value");
final ListObjectInspector loi = (ListObjectInspector) oi;
types.add(loi.getListElementObjectInspector());
params = new Object[1];
break;
default:
throw new UDFArgumentException("not supported category for function explode : " + category);
}
return ObjectInspectorFactory.getStandardStructObjectInspector(names,types);
}
@Override
public void process(Object[] args) throws HiveException {
if (args.length != 1 || Objects.isNull(args[0])){
throw new HiveException("Only 1 nonnull arg supported for function explode, but got " + args.length);
}
ObjectInspector.Category category = oi.getCategory();
switch(category){
case MAP:
final Map<?, ?> map = ((MapObjectInspector) oi).getMap(args[0]);
final Iterator<? extends Map.Entry<?, ?>> it = map.entrySet().iterator();
while(it.hasNext()){
final Map.Entry<?, ?> entry = it.next();
params[0] = entry.getKey();
params[1] = entry.getValue();
forward(params);
}
break;
case LIST:
final List<?> list = ((ListObjectInspector) oi).getList(args[0]);
final Iterator<?> itl = list.iterator();
while (itl.hasNext()) {
params[0] = itl.next();
forward(params);
}
break;
}
}
@Override
public void close() throws HiveException {
oi = null;
for (int i = 0; i < params.length; i++) {
params[i] = null;
}
params = null;
}
}
上传JAR文件到Volume中
编译打包为 jar, 上传到用户指定的对象存储位置或Lakehouse Volume对象中;
--上传打包后的UDF JAR
PUT '/Users/Downloads/MyExplode.jar' TO VOLUME qn_hz_bucket_vol FILE 'MyExplode.jar';
创建External Function
- 创建与函数计算服务连接的Connecton对象(参见UDF中的介绍)
- 在 LakeHouse 系统中创建外部函数
UDTF的External Function创建语法说明:
CREATE EXTERNAL FUNCTION public.<funcName>
AS '<className>'
USING FILE 'oss://<bucket>/<pathToJar>'
CONNECTION <connectionName>
WITH PROPERTIES (
'remote.udf.api' = 'java8.hive2.v0',
'remote.udf.category' = 'TABLE_VALUED');
参数说明:
- functionName: 可使用任意合法标识符, 比如 my_udtf
- className: 填写第 1 步中开发的 GenericUDTF 的完整类名, 比如 com.example.MyGenericUDTF;
- bucket 和 pathToJar: 填写第 2 步上传到 OSS 的存储桶和对象路径;
- connectionName: 使用第 3 步创建的 connection 的名字, 比如 my_function_conn;
- 最后两个 PROPERTIES 保持原样即可;
External Function类型映射
| | |
---|
CZ 类型 | ObjectInspector 接口 | Object Type (默认 java.lang.*) |
tinyint | JavaByteObjectInspector | Byte |
smallint | JavaShortObjectInspector | Short |
int | JavaIntObjectInspector | Integer |
bigint | JavaLongObjectInspector | Long |
float | JavaFloatObjectInspector | Float |
double | JavaDoubleObjectInspector | Double |
decimal | JavaHiveDecimalObjectInspector | org.apache.hadoop.hive.common.type.HiveDecimal |
boolean | JavaBooleanObjectInspector | Boolean |
char | JavaHiveCharObjectInspector | org.apache.hadoop.hive.common.type.HiveChar |
varchar | JavaHiveVarcharObjectInspector | org.apache.hadoop.hive.common.type.HiveVarchar |
string | JavaStringObjectInspector | String |
binary | JavaBinaryObjectInspector | byte[] |
date | JavaDateObjectInspector | java.sql.Date |
timestamp | JavaTimestampObjectInspector | java.sql.Timestamp |
void | JavaVoidObjectInspector | Void |
array | StandardListObjectInspector | java.util.List |
map | StandardMapObjectInspector | java.util.map |
struct | StandardStructObjectInspector | java.util.List |