ClickZetta Connector Python SDK

clickzetta-connector
clickzetta-connector
是云器 Lakehouse 的官方 Python SDK,遵循 PEP-249 规范,提供符合 Python Database API 风格的 SQL 调用接口,支持查询、写入、批量插入和异步执行。

本文覆盖:安装 → 建立连接 → 执行查询 → 参数绑定 → 批量插入 → 异步执行。


安装

pip install clickzetta-connector

Python 版本要求 >= 3.10。

如果已安装旧版本,先卸载避免冲突:

pip uninstall clickzetta-connector clickzetta-connector-python clickzetta-sqlalchemy clickzetta-ingestion-python clickzetta-ingestion-python-v2 -y


建立连接

from clickzetta import connect conn = connect( username='your_username', password='your_password', service='cn-shanghai-alicloud.api.clickzetta.com', # 服务端点 instance='your_instance', # 实例名 workspace='your_workspace', # 工作空间名 schema='public', # 默认 Schema vcluster='DEFAULT' # 计算集群名 )

连接参数说明

参数必填说明
username
username
用户名
password
password
密码
service
service
服务端点,格式
region_id.api.clickzetta.com
region_id.api.clickzetta.com
,在 Studio 管理 → 工作空间的 JDBC 连接串中获取
instance
instance
实例名,同上
workspace
workspace
工作空间名
schema
schema
默认 Schema 名
vcluster
vcluster
计算集群名,默认填
default
default
protocol
protocol
默认
https
https
,支持
http
http
/
https
https

执行查询

cursor = conn.cursor()

执行查询:

cursor.execute('SELECT * FROM orders LIMIT 10')

获取结果:

results = cursor.fetchall() for row in results: print(row)

用完关闭:

cursor.close() conn.close()

保存结果到 CSV:

import csv cursor.execute('SELECT * FROM orders LIMIT 1000') results = cursor.fetchall() with open('output.csv', 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([col[0] for col in cursor.description]) # 写表头 writer.writerows(results)


SQL Hints

通过

parameters
parameters
传递 SQL hints,例如设置超时时间:

params = { 'hints': { 'sdk.job.timeout': 30 # 超时 30 秒 } } cursor.execute('SELECT * FROM large_table', parameters=params)

支持的参数见 参数管理


参数绑定

支持两种风格,遵循 PEP-249

风格占位符示例
qmark
qmark
?
?
INSERT INTO t VALUES (?, ?)
INSERT INTO t VALUES (?, ?)
pyformat
pyformat
%(name)s
%(name)s
INSERT INTO t VALUES (%(id)s, %(name)s)
INSERT INTO t VALUES (%(id)s, %(name)s)

qmark 风格:

cursor.execute('INSERT INTO test (id, name) VALUES (?, ?)', binding_params=[1, 'test'])

pyformat 风格:

data = {'id': 1, 'name': 'test'} cursor.execute('INSERT INTO test (id, name) VALUES (%(id)s, "%(name)s")', data)


批量插入

使用

executemany()
executemany()
高效批量写入:

data = [ (1, 'Alice'), (2, 'Bob'), (3, 'Charlie') ] cursor.executemany('INSERT INTO users (id, name) VALUES (?, ?)', data)

自动类型转换(tolerant 模式):

hints = {'hints': {'cz.sql.type.conversion': 'tolerant'}} cursor.executemany('INSERT INTO test (id, name) VALUES (int(?), string(?))', data, hints)

复杂类型批量插入示例(ARRAY / MAP / STRUCT / JSON):

import datetime cursor.execute(''' CREATE TABLE test_table ( c_bigint BIGINT, c_boolean BOOLEAN, c_date DATE, c_decimal DECIMAL(20, 6), c_string STRING, c_array ARRAY<STRUCT<a: INT, b: STRING>>, c_map MAP<STRING, STRING>, c_struct STRUCT<a: INT, b: STRING, c: DOUBLE>, c_json JSON ) ''') data = [( 1, True, datetime.date(2024, 1, 1), 1000.123456, 'hello', [(1, 'A')], {'key': 'value'}, (1, 'A', 2.0), "JSON '{\"id\": 1}'" )] cursor.executemany( 'INSERT INTO test_table VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)', data )


异步执行

适合长时间运行的查询,避免阻塞:

import time cursor.execute_async('SELECT count(*) FROM large_table') while not cursor.is_job_finished(): print("执行中...") time.sleep(1) results = cursor.fetchall() print(results)


注意事项

  • 不支持
    commit
    commit
    rollback
    rollback
    接口
  • 查询结果较大时建议使用
    fetchmany(size)
    fetchmany(size)
    分批获取,避免内存溢出
  • 生产环境建议使用连接池管理连接,避免频繁创建销毁

相关文档

文档说明
ZettaparkPython DataFrame API,类 pandas 操作 Lakehouse 数据
SQLAlchemy 连接ORM 和 BI 工具连接配置
BulkLoad 批量导入百万行级高速批量写入
参数管理SQL hints 和会话参数参考
连接到 Lakehouse各种连接方式总览
使用示例(完整场景)批量写入、条件查询、聚合、CSV 导出、异步执行等实测示例
高级用法fetch_pandas、性能分析、取消查询、动态切换 Schema 等
联系我们
预约咨询
微信咨询
电话咨询
邮件咨询