Studio Python 任务开发指南(Python Connector)
Studio Python 任务内置
clickzetta-connectorclickzetta-connector
,通过
get_active_lakehouse_engine().raw_connection()get_active_lakehouse_engine().raw_connection()
即可拿到符合 PEP 249 规范的 connection 对象,直接使用 cursor/executemany 执行 SQL 和批量写入。
本文以 用户行为事件写入 + 漏斗分析 为例,演示完整开发流程,包含任务参数的使用方式。脚本自包含,直接粘贴到 Studio 任务里即可运行,无需额外准备。所有代码经过 cz-cli task execute 实测验证。
核心机制
Python 任务里获取 connector connection 只需两行:
from clickzetta_dbutils import get_active_lakehouse_engine
engine = get_active_lakehouse_engine(schema="your_schema")
conn = engine.raw_connection() # 底层 clickzetta connector connection
cursor = conn.cursor()
get_active_lakehouse_engine()get_active_lakehouse_engine()
从 Studio 运行时注入的连接信息中自动构建 engine,不需要硬编码用户名密码。
raw_connection()raw_connection()
返回底层的 clickzetta connector connection,支持完整的 PEP 249 接口:
execute()execute()
、
executemany()executemany()
、
fetchall()fetchall()
、
fetchmany()fetchmany()
、
cursor.descriptioncursor.description
等。
⚠️ 注意 :ClickZetta 不支持事务,
commit()commit()
和
rollback()rollback()
接口无效,每条 SQL 自动提交。
任务参数 :Studio 支持在脚本里用
'${param_name}''${param_name}'
引用参数,运行时自动替换为实际值。字符串参数加引号,数值参数不加引号:
biz_date = '${biz_date}' # 字符串,运行时替换为 2024-12-01
limit = ${limit} # 数值,运行时替换为 100
配置参数取值 有两种方式:
Studio UI :在任务编辑器里点击右侧 参数 按钮,系统自动识别 ${biz_date}${biz_date}
,为它赋值(如 $[yyyy-MM-dd, -1d]$[yyyy-MM-dd, -1d]
)
cz-cli :在 save-contentsave-content
时通过 --params--params
传入 JSON:
cz-cli task save-content my_task --file task.py \
--params '{"biz_date": "$[yyyy-MM-dd, -1d]"}' \
--profile <your-profile>
运行时参数替换规则 :
调度运行 :系统按配置的表达式自动计算并替换,如 $[yyyy-MM-dd, -1d]$[yyyy-MM-dd, -1d]
替换为昨天日期
临时执行 :配置的表达式不生效,必须通过 --param--param
手动指定值,或在 Studio UI 弹窗里输入
临时执行时手动指定参数值:
cz-cli task execute my_task --param "biz_date=2024-12-01" --profile <your-profile>
场景:用户行为漏斗分析(含任务参数)
每天凌晨处理前一天的用户行为数据,通过任务参数
biz_datebiz_date
控制处理哪天的数据,结果写入汇总表。
完整脚本
任务参数和连接初始化:
import datetime
from clickzetta_dbutils import get_active_lakehouse_engine
biz_date = '${biz_date}'
print(f"处理日期:{biz_date}")
engine = get_active_lakehouse_engine(schema="doc_connector_demo")
conn = engine.raw_connection()
cursor = conn.cursor()
print("连接成功")
建表(幂等,首次运行自动创建):
cursor.execute("CREATE SCHEMA IF NOT EXISTS doc_connector_demo")
cursor.execute("""
CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_events (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
page STRING,
duration INT,
event_time TIMESTAMP
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS doc_connector_demo.doc_funnel_daily (
biz_date STRING,
step1_view BIGINT,
step2_cart BIGINT,
step3_checkout BIGINT,
run_time TIMESTAMP
)
""")
print("建表完成")
写入当天事件数据:
events = [
(1, 101, 'view', 'home', 30, datetime.datetime(2024, 12, 1, 10, 0, 0)),
(2, 101, 'click', 'product', 5, datetime.datetime(2024, 12, 1, 10, 0, 35)),
(3, 102, 'view', 'home', 45, datetime.datetime(2024, 12, 1, 10, 1, 0)),
(4, 102, 'view', 'product', 120, datetime.datetime(2024, 12, 1, 10, 2, 0)),
(5, 102, 'click', 'cart', 8, datetime.datetime(2024, 12, 1, 10, 4, 0)),
(6, 103, 'view', 'home', 15, datetime.datetime(2024, 12, 1, 10, 5, 0)),
(7, 103, 'view', 'product', 200, datetime.datetime(2024, 12, 1, 10, 5, 20)),
(8, 103, 'click', 'cart', 6, datetime.datetime(2024, 12, 1, 10, 8, 0)),
(9, 103, 'click', 'checkout', 12, datetime.datetime(2024, 12, 1, 10, 8, 10)),
(10, 104, 'view', 'home', 10, datetime.datetime(2024, 12, 1, 10, 9, 0)),
]
cursor.executemany(
"INSERT INTO doc_connector_demo.doc_events VALUES (?, ?, ?, ?, ?, ?)",
events
)
print(f"写入 {len(events)} 条事件记录")
查询指定日期的事件统计:
cursor.execute(f"""
SELECT event_type,
COUNT(*) AS cnt,
AVG(duration) AS avg_duration
FROM doc_connector_demo.doc_events
WHERE DATE(event_time) = DATE '{biz_date}'
GROUP BY event_type
ORDER BY cnt DESC
""")
col_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()
print(f"\n{biz_date} 事件统计(列:{col_names}):")
for row in rows:
print(f" {row[0]:10s} 次数={row[1]} 平均时长={row[2]:.1f}s")
漏斗分析:
cursor.execute(f"""
SELECT
COUNT(DISTINCT CASE WHEN event_type = 'view' THEN user_id END) AS step1_view,
COUNT(DISTINCT CASE WHEN event_type = 'click' AND page = 'cart' THEN user_id END) AS step2_cart,
COUNT(DISTINCT CASE WHEN event_type = 'click' AND page = 'checkout' THEN user_id END) AS step3_checkout
FROM doc_connector_demo.doc_events
WHERE DATE(event_time) = DATE '{biz_date}'
""")
row = cursor.fetchone()
view, cart, checkout = row[0], row[1], row[2]
print(f"\n漏斗分析({biz_date}):")
print(f" 浏览首页: {view} 人")
if view > 0:
print(f" 加入购物车: {cart} 人 (转化率 {cart/view*100:.0f}%)")
print(f" 进入结算: {checkout} 人 (转化率 {checkout/view*100:.0f}%)")
fetchmany 分批读取 + 写入汇总:
cursor.execute(f"""
SELECT event_id, user_id, event_type, page, duration
FROM doc_connector_demo.doc_events
WHERE DATE(event_time) = DATE '{biz_date}'
ORDER BY event_id
""")
print(f"\nfetchmany 分批读取(每批 3 行):")
while True:
batch = cursor.fetchmany(3)
if not batch:
break
print(f" 本批 {len(batch)} 行: event_id {batch[0][0]} ~ {batch[-1][0]}")
cursor.executemany(
"INSERT INTO doc_connector_demo.doc_funnel_daily VALUES (?, ?, ?, ?, ?)",
[(biz_date, view, cart, checkout, datetime.datetime.now())]
)
print(f"\n汇总结果已写入 doc_funnel_daily")
cursor.close()
conn.close()
print("完成")
创建并执行任务
Studio UI
进入 数据开发 → 新建任务 ,选择 Python 类型,填写任务名称
将上方脚本粘贴到编辑器
点击右侧 参数 按钮,系统自动识别 ${biz_date}${biz_date}
,赋值为 $[yyyy-MM-dd, -1d]$[yyyy-MM-dd, -1d]
(取昨天日期)
点击 调度 按钮,配置 VCluster(选通用型 DEFAULTDEFAULT
)和 Cron 表达式(如 0 3 * * *0 3 * * *
)
点击 发布 ,再点击 运行 → 在弹窗里输入 biz_date=2024-12-01biz_date=2024-12-01
验证
cz-cli (适合 CI/CD 或批量管理场景,详见 Studio 任务开发与运维 )
创建任务:
cz-cli task create connector_funnel --type python --profile <your-profile>
上传脚本并配置参数(biz_date 取昨天日期):
cz-cli task save-content connector_funnel --file connector_funnel.py \
--params '{"biz_date": "$[yyyy-MM-dd, -1d]"}' \
--profile <your-profile>
配置调度(每天凌晨 3 点执行):
cz-cli task save-config connector_funnel --vcluster DEFAULT --retry-count 1 --profile <your-profile>
cz-cli task save-cron connector_funnel --cron "0 3 * * *" --profile <your-profile>
发布并临时执行验证:
cz-cli task online connector_funnel -y --profile <your-profile>
cz-cli task execute connector_funnel --param "biz_date=2024-12-01" --profile <your-profile>
执行结果
处理日期:2024-12-01
连接成功
建表完成
写入 10 条事件记录
2024-12-01 事件统计(列:['event_type', 'cnt', 'avg_duration']):
view 次数=6 平均时长=70.0s
click 次数=4 平均时长=7.8s
漏斗分析(2024-12-01):
浏览首页: 4 人
加入购物车: 2 人 (转化率 50%)
进入结算: 1 人 (转化率 25%)
fetchmany 分批读取(每批 3 行):
本批 3 行: event_id 1 ~ 3
本批 3 行: event_id 4 ~ 6
本批 3 行: event_id 7 ~ 9
本批 1 行: event_id 10 ~ 10
汇总结果已写入 doc_funnel_daily
完成
验证写入结果:
SELECT biz_date, step1_view, step2_cart, step3_checkout
FROM doc_connector_demo.doc_funnel_daily
ORDER BY biz_date DESC
LIMIT 5;
biz_date step1_view step2_cart step3_checkout
2024-12-01 4 2 1
常用接口速查
接口 说明 cursor.execute(sql)cursor.execute(sql)
执行单条 SQL cursor.executemany(sql, data)cursor.executemany(sql, data)
批量写入,datadata
为 list of tuple cursor.fetchall()cursor.fetchall()
获取全部结果行 cursor.fetchone()cursor.fetchone()
获取单行 cursor.fetchmany(n)cursor.fetchmany(n)
分批获取,每次 n 行,结果耗尽返回 [][]
cursor.descriptioncursor.description
列元数据,col[0]col[0]
为列名
executemanyexecutemany
占位符 :用
??
表示参数位置,对应 tuple 中的值按顺序绑定。TIMESTAMP 列传
datetime.datetimedatetime.datetime
对象,DATE 列传
datetime.datedatetime.date
对象。
与 ZettaPark 的选型对比
Python Connector ZettaPark 接口风格 PEP 249 cursor/SQL DataFrame 链式操作 适合场景 批量写入、精确 SQL 控制 数据处理、聚合、pandas 集成 写入方式 executemany()executemany()
create_dataframe().write.save_as_table()create_dataframe().write.save_as_table()
读取方式 fetchall()fetchall()
/ fetchmany()fetchmany()
to_pandas()to_pandas()
/ show()show()
依赖 clickzetta-connectorclickzetta-connector
(内置)clickzetta-zettapark-pythonclickzetta-zettapark-python
(内置)
两者可以在同一个 Python 任务里混用:用 connector 写入原始数据,用 ZettaPark 做聚合分析。
相关文档