ClickZetta Connector 官方 Python SDK
clickzetta-connector-python
是云器 ClickZetta Lakehouse 的官方 Python SDK,遵循 PEP-249 规范,提供了一个符合 Python Database API 风格的 SQL 调用接口。通过使用该接口,您可以轻松地在 Python 应用程序中执行 SQL 查询、插入、更新和删除操作。
clickzetta-ingestion-python
支持批量数据上传(bulkload)功能,可以大幅提高数据导入速度。这对于处理大量数据的场景尤为有用。
使用示例
- 删除旧版本依赖
如果安装过旧版本的sdk,先卸载旧版本的 clickzetta-connector 和 clickzetta-sqlalchemy 包
pip uninstall clickzetta-connector clickzetta-sqlalchemy -y
注意 clickzetta-connector 后续版本将不再维护,请按需安装 clickzetta-connector-python 和 clickzetta-ingestion-python 包
- 安装 clickzetta-connector-python,Python版本要求 ( >= 3.6, <=3.11 ):
pip install clickzetta-connector-python
- 如果需要使用批量数据上传功能,需要安装
clickzetta-ingestion-python
包, Python版本要求 ( >= 3.6, <=3.11 ):
pip install "clickzetta-ingestion-python[all]" -U
注意:all 为安装所有云环境包,安装时间较长,并可能会导致冲突,可以按需安装,具体参考下文
按需安装 clickzetta-ingestion-python
云环境包
type | command | comment |
---|
all | pip install "clickzetta-ingestion-python[all]" | 安装所有云环境包 |
s3 | pip install "clickzetta-ingestion-python[s3]" | 安装亚马逊云环境包 |
amazon | pip install "clickzetta-ingestion-python[amazon]" | 安装亚马逊云环境包 |
aws | pip install "clickzetta-ingestion-python[aws]" | 安装亚马逊云环境包 |
oss | pip install "clickzetta-ingestion-python[oss]" | 安装阿里云环境包 |
aliyun | pip install "clickzetta-ingestion-python[aliyun]" | 安装阿里云环境包 |
cos | pip install "clickzetta-ingestion-python[cos]" | 安装阿里云环境包 |
tencent | pip install "clickzetta-ingestion-python[tencent]" | 安装腾讯云环境包 |
gcp | pip install "clickzetta-ingestion-python[gcp]" | 安装 Google 云环境包 |
google | pip install "clickzetta-ingestion-python[google]" | 安装 Google 云环境包 |
快速开始
执行 SQL 查询
以下是一个简单的示例,展示了如何使用 clickzetta-connector-python
执行 SQL 查询:
from clickzetta import connect
### 建立连接
conn = connect(
username='your_username',
password='your_password',
service='api.clickzetta.com',
instance='your_instance',
workspace='your_workspace',
schema='public',
vcluster='default'
)
参数 | 是否必填 | 描述 |
---|
username | Y | 用户名 |
password | Y | 密码 |
service | Y | 连接lakehouse的地址, region.api.clickzetta.com。可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串 |
instance | Y | 可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串以查看 |
workspace | Y | 使用的工作空间 |
vcluster | Y | 使用的vc |
schema | Y | 访问的schema名 |
简单查询案例
#创建游标对象
cursor = conn.cursor()
#执行 SQL 查询
cursor.execute('SELECT * FROM clickzetta_sample_data.ecommerce_events_history.ecommerce_events_multicategorystore_live LIMIT 10;')
#获取查询结果
results = cursor.fetchall()
for row in results:
print(row)
使用 SQL hints
在 JDBC 中通过 set 命令设置的 SQL hints 可以通过 parameters
参数传递,支持的参数参考参数管理。以下是一个示例:
#设置作业运行超时时间为 30 秒
my_param = {
'hints': {
'sdk.job.timeout': 30
}
}
cursor.execute('YOUR_SQL_QUERY', parameters=my_param)
使用
更多示例
1. 处理查询结果
以下示例展示了如何处理查询结果,例如将结果保存到 CSV 文件中:
import csv
# 执行查询
cursor.execute('SELECT * FROM clickzetta_sample_data.ecommerce_events_history.ecommerce_events_multicategorystore_live LIMIT 10;')
# 获取查询结果
results = cursor.fetchall()
# 将结果保存到 CSV 文件
with open('output.csv', 'w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow([column[0] for column in cursor.description])
csv_writer.writerows(results)
# 关闭连接
cursor.close()
conn.close()
高级功能
注意:以下高级功能需要使用 clickzetta-connector-python >= 0.8.82
版本。
参数绑定
clickzetta-connector-python 支持两种参数绑定风格,遵循 PEP-249 规范:
paramstyle | 描述 | 示例 |
---|
qmark | 使用问号(?) 作为参数占位符 | INSERT INTO test VALUES (?) |
pyformat | 使用 Python 扩展格式代码,例如 %(name)s | INSERT INTO test VALUES (%(value)s) |
使用问号风格 (qmark)
# 简单示例
cursor.execute('INSERT INTO test (id, name) VALUES (?, ?)', binding_params=[1, 'test'])
# JSON 类型示例
json_data = "JSON '" + '{"id": 2, "value": "100", "comment": "JSON Sample data"}' + "'"
my_param = {
'hints': {
'sdk.job.timeout': 30
}
}
cursor.execute('INSERT INTO test (id, json_col) VALUES (?, ?)', my_param, binding_params=[1, json_data])
使用qmark风格批量插入
使用 executemany()
方法支持高效地使用 qmark 风格执行批量插入操作:
# 准备数据
data = [
(1, 'test1'),
(2, 'test2'),
(3, 'test3')
]
# 执行批量插入
cursor.executemany('INSERT INTO test (id, name) VALUES (?, ?)', data)
如需对输入的数据根据表结构进行自动转换,请开启 tolerant
参数并指定类型:
# 准备数据
data = [
(1, 'test1'),
(2, 0),
(3, 0.1)
]
hints = {'hints': {
"cz.sql.type.conversion": "tolerant"
}}
# 执行批量插入
cursor.executemany('INSERT INTO test (id, name) VALUES (int(?), string(?))', data, hints)
使用 Python 格式风格 (pyformat)
# 使用命名参数
data = {'id': 1, 'name': 'test'}
cursor.execute('INSERT INTO test (id, name) VALUES (%(id)s, "%(name)s")', data)
注意:在 pyformat 风格中,参数值是字符串插入时需要加引号。
完整示例:复杂数据类型的批量插入
以下示例展示了如何使用 executemany
插入包含各种数据类型的数据:
table = 'test_table'
cursor.execute(f'''
CREATE TABLE {table} (
c_bigint BIGINT,
c_boolean BOOLEAN,
c_binary BINARY,
c_char CHAR,
c_date DATE,
c_decimal DECIMAL(20, 6),
c_double DOUBLE,
c_float FLOAT,
c_int INT,
c_interval INTERVAL DAY,
c_smallint SMALLINT,
c_string STRING,
c_timestamp TIMESTAMP,
c_tinyint TINYINT,
c_array ARRAY<STRUCT<a: INT, b: STRING>>,
c_map MAP<STRING, STRING>,
c_struct STRUCT<a: INT, b: STRING, c: DOUBLE>,
c_varchar VARCHAR(1024),
c_json JSON
)
''')
data = [
(
1,
True,
b'\x01',
'a',
datetime.date(2022, 2, 1),
1000.123456,
2.0,
1.5,
42,
'INTERVAL 1 DAY',
103,
'test string 1',
datetime.datetime.now(),
11,
[(1, 'A')],
{'key1': 'value1'},
(1, 'A', 2.0),
'varchar example 1',
("JSON '" + '{"id": 2, "value": "100", "comment": "JSON Sample data"}' + "'")
)
]
sql = f'''
INSERT INTO {table} (
c_bigint, c_boolean, c_binary, c_char, c_date, c_decimal, c_double,
c_float, c_int, c_interval, c_smallint, c_string, c_timestamp,
c_tinyint, c_array, c_map, c_struct, c_varchar, c_json
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
'''
cursor.executemany(sql, data)
my_param = {'hints': {}}
# 获取结果
cursor.execute(f'SELECT * FROM {table}', my_param)
result = cursor.fetchall()
异步执行 (execute_async)
execute_async()
方法支持异步执行 SQL 查询,特别适用于长时间运行的查询:
# 异步执行查询
cursor.execute_async('SELECT * FROM large_table')
# 检查查询是否完成
while not cursor.is_job_finished():
print("查询执行中...")
time.sleep(1)
# 获取结果
results = cursor.fetchall()
注意事项