ClickZetta Connector 官方 Python SDK

clickzetta-connector-python 是云器 ClickZetta Lakehouse 的官方 Python SDK,遵循 PEP-249 规范,提供了一个符合 Python Database API 风格的 SQL 调用接口。通过使用该接口,您可以轻松地在 Python 应用程序中执行 SQL 查询、插入、更新和删除操作。 clickzetta-ingestion-python 支持批量数据上传(bulkload)功能,可以大幅提高数据导入速度。这对于处理大量数据的场景尤为有用。

使用示例

  1. 删除旧版本依赖

如果安装过旧版本的sdk,先卸载旧版本的 clickzetta-connector 和 clickzetta-sqlalchemy 包

pip uninstall clickzetta-connector clickzetta-sqlalchemy -y
  1. 安装 clickzetta-connector-python,Python版本要求 ( >= 3.6, <=3.11 ):
pip install clickzetta-connector-python
  1. 如果需要使用批量数据上传功能,需要安装 clickzetta-ingestion-python 包, Python版本要求 ( >= 3.6, <=3.11 ):
pip install "clickzetta-ingestion-python[all]" -U

按需安装 clickzetta-ingestion-python 云环境包

typecommandcomment
allpip install "clickzetta-ingestion-python[all]"安装所有云环境包
s3pip install "clickzetta-ingestion-python[s3]"安装亚马逊云环境包
amazonpip install "clickzetta-ingestion-python[amazon]"安装亚马逊云环境包
awspip install "clickzetta-ingestion-python[aws]"安装亚马逊云环境包
osspip install "clickzetta-ingestion-python[oss]"安装阿里云环境包
aliyunpip install "clickzetta-ingestion-python[aliyun]"安装阿里云环境包
cospip install "clickzetta-ingestion-python[cos]"安装阿里云环境包
tencentpip install "clickzetta-ingestion-python[tencent]"安装腾讯云环境包
gcppip install "clickzetta-ingestion-python[gcp]"安装 Google 云环境包
googlepip install "clickzetta-ingestion-python[google]"安装 Google 云环境包

快速开始

执行 SQL 查询

以下是一个简单的示例,展示了如何使用 clickzetta-connector-python 执行 SQL 查询:

from clickzetta import connect

### 建立连接
conn = connect(
    username='your_username',
    password='your_password',
    service='api.clickzetta.com',
    instance='your_instance',
    workspace='your_workspace',
    schema='public',
    vcluster='default'
)
参数是否必填描述
usernameY用户名
passwordY密码
serviceY连接lakehouse的地址, region.api.clickzetta.com。可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串
instanceY可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串以查看
workspaceY使用的工作空间
vclusterY使用的vc
schemaY访问的schema名

简单查询案例

#创建游标对象
cursor = conn.cursor()
#执行 SQL 查询
cursor.execute('SELECT * FROM clickzetta_sample_data.ecommerce_events_history.ecommerce_events_multicategorystore_live LIMIT 10;')
#获取查询结果
results = cursor.fetchall()
for row in results:
print(row)

使用 SQL hints

在 JDBC 中通过 set 命令设置的 SQL hints 可以通过 parameters 参数传递,支持的参数参考参数管理。以下是一个示例:

#设置作业运行超时时间为 30 秒
my_param = {
    'hints': {
        'sdk.job.timeout': 30
    }
}
cursor.execute('YOUR_SQL_QUERY', parameters=my_param)

使用

更多示例

1. 处理查询结果

以下示例展示了如何处理查询结果,例如将结果保存到 CSV 文件中:

import csv

# 执行查询
cursor.execute('SELECT * FROM clickzetta_sample_data.ecommerce_events_history.ecommerce_events_multicategorystore_live LIMIT 10;')

# 获取查询结果
results = cursor.fetchall()

# 将结果保存到 CSV 文件
with open('output.csv', 'w', newline='', encoding='utf-8') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow([column[0] for column in cursor.description])
    csv_writer.writerows(results)
# 关闭连接
cursor.close()
conn.close()

高级功能

参数绑定

clickzetta-connector-python 支持两种参数绑定风格,遵循 PEP-249 规范:

paramstyle描述示例
qmark使用问号(?) 作为参数占位符INSERT INTO test VALUES (?)
pyformat使用 Python 扩展格式代码,例如 %(name)sINSERT INTO test VALUES (%(value)s)
使用问号风格 (qmark)
# 简单示例
cursor.execute('INSERT INTO test (id, name) VALUES (?, ?)', binding_params=[1, 'test'])

# JSON 类型示例
json_data = "JSON '" + '{"id": 2, "value": "100", "comment": "JSON Sample data"}' + "'"
my_param = {
    'hints': {
        'sdk.job.timeout': 30
    }
}
cursor.execute('INSERT INTO test (id, json_col) VALUES (?, ?)', my_param, binding_params=[1, json_data])
使用qmark风格批量插入

使用 executemany() 方法支持高效地使用 qmark 风格执行批量插入操作:

# 准备数据
data = [
    (1, 'test1'),
    (2, 'test2'),
    (3, 'test3')
]

# 执行批量插入
cursor.executemany('INSERT INTO test (id, name) VALUES (?, ?)', data)

如需对输入的数据根据表结构进行自动转换,请开启 tolerant 参数并指定类型:

# 准备数据
data = [
    (1, 'test1'),
    (2, 0),
    (3, 0.1)
]

hints = {'hints': {
    "cz.sql.type.conversion": "tolerant"
}}

# 执行批量插入
cursor.executemany('INSERT INTO test (id, name) VALUES (int(?), string(?))', data, hints)
使用 Python 格式风格 (pyformat)
# 使用命名参数
data = {'id': 1, 'name': 'test'}
cursor.execute('INSERT INTO test (id, name) VALUES (%(id)s, "%(name)s")', data)
完整示例:复杂数据类型的批量插入

以下示例展示了如何使用 executemany 插入包含各种数据类型的数据:

table = 'test_table'
cursor.execute(f'''
    CREATE TABLE {table} (
        c_bigint BIGINT,
        c_boolean BOOLEAN,
        c_binary BINARY,
        c_char CHAR,
        c_date DATE,
        c_decimal DECIMAL(20, 6),
        c_double DOUBLE,
        c_float FLOAT,
        c_int INT,
        c_interval INTERVAL DAY,
        c_smallint SMALLINT,
        c_string STRING,
        c_timestamp TIMESTAMP,
        c_tinyint TINYINT,
        c_array ARRAY<STRUCT<a: INT, b: STRING>>,
        c_map MAP<STRING, STRING>,
        c_struct STRUCT<a: INT, b: STRING, c: DOUBLE>,
        c_varchar VARCHAR(1024),
        c_json JSON
    )
''')

data = [
    (
        1,
        True,
        b'\x01',
        'a',
        datetime.date(2022, 2, 1),
        1000.123456,
        2.0,
        1.5,
        42,
        'INTERVAL 1 DAY',
        103,
        'test string 1',
        datetime.datetime.now(),
        11,
        [(1, 'A')],
        {'key1': 'value1'},
        (1, 'A', 2.0),
        'varchar example 1',
        ("JSON '" + '{"id": 2, "value": "100", "comment": "JSON Sample data"}' + "'")
    )
]
sql = f'''
    INSERT INTO {table} (
        c_bigint, c_boolean, c_binary, c_char, c_date, c_decimal, c_double, 
        c_float, c_int, c_interval, c_smallint, c_string, c_timestamp, 
        c_tinyint, c_array, c_map, c_struct, c_varchar, c_json
    ) VALUES (
        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
    )
'''

cursor.executemany(sql, data)
my_param = {'hints': {}}

# 获取结果
cursor.execute(f'SELECT * FROM {table}', my_param)
result = cursor.fetchall()

异步执行 (execute_async)

execute_async() 方法支持异步执行 SQL 查询,特别适用于长时间运行的查询:

# 异步执行查询
cursor.execute_async('SELECT * FROM large_table')

# 检查查询是否完成
while not cursor.is_job_finished():
    print("查询执行中...")
    time.sleep(1)

# 获取结果
results = cursor.fetchall()

注意事项

  • 不支持commit和rollback接口

联系我们
预约咨询
微信咨询
电话咨询