ClickZetta Connector Python SDK
clickzetta-connectorclickzetta-connector
是云器 Lakehouse 的官方 Python SDK,遵循 PEP-249 规范,提供符合 Python Database API 风格的 SQL 调用接口,支持查询、写入、批量插入和异步执行。
本文覆盖 :安装 → 建立连接 → 执行查询 → 参数绑定 → 批量插入 → 异步执行。
安装
pip install clickzetta-connector
Python 版本要求 >= 3.10。
如果已安装旧版本,先卸载避免冲突:
pip uninstall clickzetta-connector clickzetta-connector-python clickzetta-sqlalchemy clickzetta-ingestion-python clickzetta-ingestion-python-v2 -y
建立连接
from clickzetta import connect
conn = connect(
username='your_username',
password='your_password',
service='cn-shanghai-alicloud.api.clickzetta.com', # 服务端点
instance='your_instance', # 实例名
workspace='your_workspace', # 工作空间名
schema='public', # 默认 Schema
vcluster='DEFAULT' # 计算集群名
)
连接参数说明
参数 必填 说明 usernameusername
✅ 用户名 passwordpassword
✅ 密码 serviceservice
✅ 服务端点,格式 region_id.api.clickzetta.comregion_id.api.clickzetta.com
,在 Studio 管理 → 工作空间的 JDBC 连接串中获取 instanceinstance
✅ 实例名,同上 workspaceworkspace
✅ 工作空间名 schemaschema
✅ 默认 Schema 名 vclustervcluster
✅ 计算集群名,默认填 defaultdefault
protocolprotocol
❌ 默认 httpshttps
,支持 httphttp
/ httpshttps
执行查询
cursor = conn.cursor()
执行查询:
cursor.execute('SELECT * FROM orders LIMIT 10')
获取结果:
results = cursor.fetchall()
for row in results:
print(row)
用完关闭:
cursor.close()
conn.close()
保存结果到 CSV:
import csv
cursor.execute('SELECT * FROM orders LIMIT 1000')
results = cursor.fetchall()
with open('output.csv', 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([col[0] for col in cursor.description]) # 写表头
writer.writerows(results)
SQL Hints
通过
parametersparameters
传递 SQL hints,例如设置超时时间:
params = {
'hints': {
'sdk.job.timeout': 30 # 超时 30 秒
}
}
cursor.execute('SELECT * FROM large_table', parameters=params)
支持的参数见 参数管理 。
参数绑定
⚠️ 需要
clickzetta-connector >= 1.0.11clickzetta-connector >= 1.0.11
支持两种风格,遵循 PEP-249 :
风格 占位符 示例 qmarkqmark
??
INSERT INTO t VALUES (?, ?)INSERT INTO t VALUES (?, ?)
pyformatpyformat
%(name)s%(name)s
INSERT INTO t VALUES (%(id)s, %(name)s)INSERT INTO t VALUES (%(id)s, %(name)s)
qmark 风格:
cursor.execute('INSERT INTO test (id, name) VALUES (?, ?)', binding_params=[1, 'test'])
pyformat 风格:
data = {'id': 1, 'name': 'test'}
cursor.execute('INSERT INTO test (id, name) VALUES (%(id)s, "%(name)s")', data)
⚠️ pyformat 风格中,字符串参数需要在 SQL 里加引号:
"%(name)s""%(name)s"
批量插入
使用
executemany()executemany()
高效批量写入:
data = [
(1, 'Alice'),
(2, 'Bob'),
(3, 'Charlie')
]
cursor.executemany('INSERT INTO users (id, name) VALUES (?, ?)', data)
自动类型转换(tolerant 模式):
hints = {'hints': {'cz.sql.type.conversion': 'tolerant'}}
cursor.executemany('INSERT INTO test (id, name) VALUES (int(?), string(?))', data, hints)
复杂类型批量插入示例(ARRAY / MAP / STRUCT / JSON):
import datetime
cursor.execute('''
CREATE TABLE test_table (
c_bigint BIGINT,
c_boolean BOOLEAN,
c_date DATE,
c_decimal DECIMAL(20, 6),
c_string STRING,
c_array ARRAY<STRUCT<a: INT, b: STRING>>,
c_map MAP<STRING, STRING>,
c_struct STRUCT<a: INT, b: STRING, c: DOUBLE>,
c_json JSON
)
''')
data = [(
1, True,
datetime.date(2024, 1, 1),
1000.123456,
'hello',
[(1, 'A')],
{'key': 'value'},
(1, 'A', 2.0),
"JSON '{\"id\": 1}'"
)]
cursor.executemany(
'INSERT INTO test_table VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)',
data
)
异步执行
适合长时间运行的查询,避免阻塞:
import time
cursor.execute_async('SELECT count(*) FROM large_table')
while not cursor.is_job_finished():
print("执行中...")
time.sleep(1)
results = cursor.fetchall()
print(results)
注意事项
不支持 commitcommit
和 rollbackrollback
接口
查询结果较大时建议使用 fetchmany(size)fetchmany(size)
分批获取,避免内存溢出
生产环境建议使用连接池管理连接,避免频繁创建销毁
相关文档