创建 LLM 函数分析公司行业

目标:利用大语言模型(LLM)服务,根据 Lakehouse 客户表中的公司名称列,填写国家规范的所属一级行业二级行业的信息。效果如下:

Step1:准备开发环境

  1. 安装 Docker:确保您的本地上安装了 Docker:https://www.docker.com/

  2. 拉取 Docker 镜像。在本地命令行终端(如 MacOS 的 ternmial)执行:

    [Local]# docker pull quay.io/pypa/manylinux2014_x86_64:2022-10-25-fbea779
  3. 启动 Docker 容器。该容器基于 manylinux2014_x86_64 镜像,并配置为使用 Python 3.10 环境

    [Local]# docker run -it --name cz_func --env PATH="/opt/python/cp310-cp310/bin:$PATH" quay.io/pypa/manylinux2014_x86_64:2022-10-25-fbea779 bash

4. 在 /root 目录下创建文件夹 cz_llm

[root@docker root]# cd /root ; mkdir cz_llm 
[root@docker cz_llm]# cd cz_llm
[root@docker cz_llm]# touch cz_llm.py

5. cz_llm.py 中的程序代码如下:

import os
from cz.udf import annotate
import dashscope
from http import HTTPStatus
import json
import sys

@annotate("*->string")
class llm_call(object):
    def evaluate(self, text, prompt, api_key, model_name, temperature=0.7, enable_search=False):

        # 设置 API 密钥
        dashscope.api_key = api_key

        # 构建消息
        messages = [
            {"role": "system", "content": prompt},
            {"role": "user", "content": text}
        ]

        try:
            # 调用模型(非流式输出)
            response = dashscope.Generation.call(
                model=model_name,
                messages=messages,
                stream=False,  # 关闭流式输出
                result_format='message',
                temperature=temperature,
                enable_search=enable_search,
                top_p=0.8
            )

            # 处理响应
            if response.status_code == HTTPStatus.OK:
                # 非流式输出直接获取完整内容
                if hasattr(response.output, 'choices') and len(response.output.choices) > 0:
                    if hasattr(response.output.choices[0].message, 'content'):
                        return response.output.choices[0].message.content
                    else:
                        return "Error: No content in response"
                else:
                    return "Error: No choices in response"
            else:
                # 返回错误信息
                return f"Error: Request id: {response.request_id}, Status code: {response.status_code}, error code: {response.code}, error 
message: {response.message}"

        except Exception as e:
            # 返回错误信息
            return f"Error: {str(e)}"

# 测试代码
if __name__ == "__main__":
    # 创建实例
    llm = llm_call()
    
    # 配置参数
    API_KEY = "sk-xxxxxx"  # 替换为你的API密钥
    MODEL_NAME = "qwen-max"  # 或 qwen-plus, qwen-max 等
    
    # 测试示例
    test_text = '小红书'
    test_prompt = '请返回该公司的国家规范的一级、二级行业,直接输出结果:一级行业":"xxx","二级行业":"xxx",言简意赅'
    
    print("正在调用LLM...")
    result = llm.evaluate(test_text, test_prompt, API_KEY, MODEL_NAME, 0, True)
    
    print(f"\n输入文本: {test_text}")
    print(f"系统提示: {test_prompt}")
    print(f"LLM响应: {result}")

Step2:下载第三方库

程序依赖第三方包:dashscope 需要进行下载(其余为Python 内置, oshttpjsonsys 为 Python 内置库无需下载。 cz.udf 创建函数时系统会默认添加)

在开发环境中的命令行终端执行:

[root@docker cz_llm]# pwd
/root/cz_llm

[root@docker cz_llm]# pip install dashscope -t .

此时目录结构类似于:

Step3:本地调试

将以3行做修改,因为当前环境还没有加载 cz.udf 库:

...
2 #from cz.udf import annotate   # 注释掉
...
8 #@annotate("*->string")  # 注释掉
...
56 API_KEY = "sk-xxxxxx"  # 替换为你的API密钥

其中 API_KEY 是阿里云百炼平台的 API-KEY ,需要您注册阿里云账号,登录后在这里获取:阿里云百炼

注释掉上面两行之后,保存退出编辑脚本。执行:

[root@docker cz_llm]# export PYTHONPATH="${_PWD}:${_PWD}/lib"
[root@docker cz_llm]# python cz_llm.py 
正在调用LLM...

输入文本: 小红书
系统提示: 请返回该公司的国家规范的一级、二级行业,直接输出结果:一级行业":"xxx","二级行业":"xxx",言简意赅
LLM响应: "一级行业":"互联网","二级行业":"社交媒体"

Step4:打包上传

打包之前,请将上面注释掉的两行,解除注释。

...
2 from cz.udf import annotate   # 去掉注释
...
8 @annotate("*->string")  # 去掉注释

执行打包命令,保证当前目录为程序目录(本示例为 /root/cz_llm

[root@docker cz_llm]# pwd
/root/cz_llm
[root@docker cz_llm]# zip -rq ../cz_llm.zip ./
[root@docker cz_llm]# ls ../

您会发现在 /root 目录下有一个 cz_llm.zip 文件,将这个文件拷贝到 Lakehouse USER VOLUME 对象中:

在 Docker 宿主机中执行:

[Local]# docker cp cz_func:/root/cz_llm.zip ~/Downloads

现在 cz_llm.zip 在宿主机的用户的 Downloads 目录下

我们用 Lakehouse JDBC 客户端(请参考 Lakehouse JDBC 客户端),将文件 put 到 Lakehouse USER VOLUME 中:

PUT '/Users/derekmeng/Downloads/transform_company_id.zip' to USER VOLUME;

Step5:创建并使用函数:

本步骤依赖您提前创建好 API connection,创建过程请参考:API Connection

CREATE EXTERNAL FUNCTION public.fc_cz_llm
    AS 'cz_llm.llm_call'   -- 不带py后缀的主程序文件名.主类名
    USING ARCHIVE 'volume:user://~/cz_llm.zip' 
    connection sg_fc_api_conn -- 需要提前创建 API Connection
    WITH PROPERTIES (
        'remote.udf.api' = 'python3.mc.v0'
    )
COMMENT 'Usage: python get_industry_classification.py <text> <prompt> <api_key> <model_name> [temperature] [enable_search]';

创建过程会持续1分钟左右。创建完成后,执行验证函数:(注意替换 '${api_key}'

SELECT    public.fc_cz_llm (
          '云器科技',
          '请返回国家通用的行业分类,返回JSON 并用中文:{"一级行业":"xxx","二级行业":"xxx"}',
          '${api_key}',
          'qwen-plus',
          '0.4',
          'true'
          ) AS llm_result;

执行效果:

联系我们
预约咨询
微信咨询
电话咨询