使用流程: External Function
文档目标:
通过本篇使用流程,您可以实现:
- 调用JAVA NLP 的离线模型(详见网址),解析云器Lakehouse 表中字符串的情绪
- 调用阿里云视觉智能开放平台服务(详见网址),解析云器Lakehouse 表中 url 指向的图片数据
(本篇最佳实践使用的环境为基于阿里云的云器Lakehouse )
操作步骤:
Step0:准备工作(授权操作)
本步骤目标是:允许云器 Lakehouse 集群访问客户侧的阿里云上的函数计算FC、对象存储服务(OSS)。为了完成这个目标,需要创建一个角色,让云器lakehouse 扮演这个角色去访问阿里云上函数计算FC和OSS服务。
1. 阿里云控制台:在阿里云控制台访问控制(RAM)中创建权限策略 (如:CzUdfOssAccess ):
- 进入阿里云 RAM 控制台
- 左侧导航栏 权限管理 -> 权限控制,在权限控制界面选择 创建权限策略
- 在创建权限策略页面选择脚本编辑的页签(将下面[] 中的 bucket 名称替换)
{
"Version": "1",
"Statement": [
{
"Effect": "Allow",
"Action": [
"oss:GetObject",
"oss:PutObject",
"oss:DeleteObject"
],
"Resource": [
"acs:oss:*:*: [bucket_name]/*",
"acs:oss:*:*:[bucket_name]/*"
]
}
]
}
2. 阿里云控制台:在阿里云 RAM 中创建角色(如:CzUDFRole):
- 在RAM 控制台左侧导航栏 身份管理 -> 角色,点击创建角色
- 在 创建角色 页面中,选择类型为 阿里云账号, 配置角色中填写自定义角色名称 如 (CzUDFRole),在选择信任的云账号 中选择 其他云账号,并写入:1384322691904283(云器Lakehouse 上海的云主账号),点击完成
- 编辑 AliyunFCFullAccess 权限策略 添加下面"acs:Service": "fc.aliyuncs.com"部分
{
"Version": "1",
"Statement": [
{
"Action": "fc:*",
"Resource": "*",
"Effect": "Allow"
},
{
"Action": "ram:PassRole",
"Resource": "*",
"Effect": "Allow",
"Condition": {
"StringEquals": {
"acs:Service": "fc.aliyuncs.com"
}
}
}
]
}
- 创建完成之后,点击为角色授权:
- 在系统策略中,将 AliyunFCFullAccess 策略授权给该角色 CzUDFRole
- 在自定义策略中,将刚创建的策略(CzUdfOssAccess)授权给该角色
3. 在角色 CzUDFRole 详情页中,获取该角色的 RoleARN 信息:
{
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"RAM": [
"acs:ram::1384322691904283:root"
],
"Service": [
"fc.aliyuncs.com"
]
}
}
],
"Version": "1"
}
场景1: 调用JAVA NLP 的离线模型:
1. 编写代码
- 基于 Hive UDF API 编写 UDF,以下是实现大小写转换的示例代码:
package com.example;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.Locale;
public class GenericUdfUpper extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
checkArgsSize(arguments, 1, 1);
checkArgPrimitive(arguments, 0);
if (((PrimitiveObjectInspector) arguments[0]).getPrimitiveCategory() != PrimitiveCategory.STRING) {
throw new UDFArgumentException("argument 0 requires to be string rather than " + arguments[0].getTypeName());
}
return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
Object arg = arguments[0].get();
if (arg == null) {
return null;
}
return ((String) arg).toUpperCase(Locale.ROOT);
}
@Override
public String getDisplayString(String[] children) {
return "upper";
}
}
- 编译代码生成 Jar 包和其他依赖文件,打包成 zip 压缩包
2. 上传函数程序包到指定路径
如: oss://hz-oss-lakehouse/functions/sentiment/UDF_code/SentimentAnalysis.zip
函数主类:com.clickzetta.nlp.GenericUDFSentiment
有两种方法上传文件到指定路径:
- 通过 OSS 客户端直接上传
- 在Lakehouse JDBC 客户端(不支持Lakehouse Web UI 使用 PUT 命令上传)通过 PUT 命令 将程序包上传至 Volume对象,并在创建函数 DDL 中引用 volume 路径。例如:
-- 上传文件到命名为 fc_volume 的 Volume对象:
PUT ./SentimentAnalysis.zip to volume fc_volume/udfs/SentimentAnalysis.zip;
-- 在创建函数时引用该 Volume 路径:
create external function public.sentiment_demo_hz
AS 'com.clickzetta.nlp.GenericUDFSentiment'
USING ARCHIVE 'volume://fc_volume/udfs/SentimentAnalysis.zip'
CONNECTION udf_sentiment_bj
WITH
PROPERTIES (
'remote.udf.api' = 'java8.hive2.v0'
);
您也可以指定内部volume。虽然您可以使用内部volume但是您在创建API CONNECTION中的code bucket参数必须填写外部地址。
- User Volume 格式地址:
volume:user://~/upper.jar
- Table Volume 格式地址
volume:table://table_name/upper.jar
table
表示使用 Table Volume 协议。
table_name
表示表名,需根据实际情况填写。
upper.jar
表示目标文件名。
3. 新建连接(Connection)
create api connection udf_sentiment_bj
type cloud_function
provider = 'aliyun'
region = 'cn-beijing'
role_arn = 'acs:ram::1222808864467016:role/czudfrole'
namespace = 'default'
code_bucket = 'derek-bj-oss';
参数解释:
-
api connection: 创建 API 类型的 Connection,用户调用第三方的服务接口;
-
type: 连接类型为云函数:cloud_function,其中具体的属性为:
- provider:云函数提供商,如 aliyun
- region:云函数所在区域,如'cn-shanghai'
- role_arn:创建云函数所扮演的角色,如 acs:ram::12228000000000000:role/czudfrole
- code_bucket:云函数程序文件所在路径的对象存储 bucket 名称
4.在 Lakehouse 创建 External Function
create external function public.sentiment_demo_hz
as 'com.clickzetta.nlp.GenericUDFSentiment'
using archive 'oss://hz-oss-lakehouse/functions/sentiment/UDF_code/SentimentAnalysis.zip'
connection udf_sentiment_hz
with properties (
'remote.udf.api' = 'java8.hive2.v0',
);
参数解释:
- as 后面跟的是 Java 函数的主类名
- using :仅支持编译后Java 程序。后面需要接参数 archive 表示程序包为 zip 格式的文件;jar 表示 java 程序的 jar 包文件。后可直接应用文件OSS 路径;如果文件已通过PUT 命令上传至 Volume对象,也可以通过 Volume 路径直接引用函数文件,例如:
USING ARCHIVE 'volume://fc_volume/udfs/SentimentAnalysis.zip'
- connection:表示程序中使用的 connection 对象,如 udf_sentiment_bj;其中属性信息:
- remote.udf.api:java UDF 请填写为 java8.hive2.v0
5. 执行语义情感分析:
构造测试数据:
create table tbl_wisdom_nlp(id int, qoute string);
insert into tbl_wisdom_nlp values(1,"Honesty and diligence should be your eternal mates");
insert into tbl_wisdom_nlp values(2,"If a man deceives me once, shame on him; if twice,shame on me");
insert into tbl_wisdom_nlp values(3,"I am so damn happy");
insert into tbl_wisdom_nlp values(4,"Today is Sunday");
insert into tbl_wisdom_nlp values(5,"Today is Monday");
执行语义分析:
set cz.sql.remote.udf.enabled = true;
select qoute, public.sentiment_demo(qoute) as sentiment from tbl_wisdom_nlp;
场景2: Python UDF: 调用第三方视觉处理平台 API 做图像解析
1. 代码文件为 video_contents.py:
from alibabacloud_imagerecog20190930.client import Client
from alibabacloud_imagerecog20190930.models import RecognizeFoodRequest
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util.models import RuntimeOptions
from cz.udf import annotate
@annotate("string->string")
class image_to_text(object):
def evaluate(self,url):
if url is None:
return None
try:
config = Config(
access_key_id='xxxxx',
access_key_secret='xxxxxxxx',
endpoint='imagerecog.cn-shanghai.aliyuncs.com',
region_id='cn-shanghai'
)
# Initialize a request and set parameters
client = Client(config)
recognize_food_request = RecognizeFoodRequest(image_url=url)
runtime = RuntimeOptions()
response = client.recognize_food_with_options(recognize_food_request, runtime)
if len(str(response.body)) >= 1:
return str(response.body)
else:
return ""
except Exception as exc:
return "[error] " + exc.__str__()
finally:
pass
#if __name__ == "__main__":
# import sys
# to_text = image_to_text()
# for url in sys.argv[1:]:
# print(f"{to_text.evaluate(url)}")
2. 复用场景一中的 connection
3. 新建 External Function
create external function public.image_to_text
as 'video_contents.image_to_text' # 脚本名称 + 类名称
using archive 'oss://derek-bj-oss/bj_remote_udf/image_to_text/image_to_text.zip'
connection udf_sentiment_bj2
with properties (
'remote.udf.api' = 'python3.mc.v0',
);
参数解释:
- As 后面为python 模块名 + 主类名,如主程序文件为 video_contents.py,主类名为 image_to_text,as 后的参数为
'video_contents.image_to_text'
- using archive / file:py 文件需打包为表示程序包为 zip 格式的文件; 同时支持单文件脚本,用 file 参数指定
4. 建测试数据验证
将以下4个图片导入到 OSS 中,并生成 public url 存储到 Lakehouse 表中。可以直接用以下 SQL 构造测试表:
create table tbl_images(id int, url string);
insert into tbl_images values(1,'http://viapi-test.oss-cn-shanghai.aliyuncs.com/viapi-3.0domepic/imagerecog/RecognizeFood/RecognizeFood1.jpg');
insert into tbl_images values(2,'http://viapi-test.oss-cn-shanghai.aliyuncs.com/viapi-3.0domepic/imagerecog/RecognizeFood/RecognizeFood2.jpg');
insert into tbl_images values(3,'http://viapi-test.oss-cn-shanghai.aliyuncs.com/viapi-3.0domepic/imagerecog/RecognizeFood/RecognizeFood3.jpg');
insert into tbl_images values(4,'http://viapi-test.oss-cn-shanghai.aliyuncs.com/viapi-3.0domepic/imagerecog/RecognizeFood/RecognizeFood4.jpg');
执行查询:
set cz.sql.remote.udf.enabled = true;
select id, public.image_to_text(url) from tbl_images;