使用 Python 批量上传数据(BulkLoad)
Clickzetta Lakehouse 通过 clickzetta-ingestion-python
包提供了 Python 语言进行批量数据上传(Bulkload)的 API。该 API 使得数据可以直接从客户端发送到存储系统,传输过程不消耗计算资源,数据在显式 commit 后可见(commit 过程会消耗少量计算资源)。适用于高吞吐,对数据新鲜度(data freshness)要求相对宽松的场景。
通过 Bulkload 相关 API,可以实现单线程以及分布式数据上传的场景。
安装
按需安装 clickzetta-ingestion-python
云环境包
type | command | comment |
---|---|---|
all | pip install "clickzetta-ingestion-python[all]" | 安装所有云环境包 |
s3 | pip install "clickzetta-ingestion-python[s3]" | 安装亚马逊云环境包 |
amazon | pip install "clickzetta-ingestion-python[amazon]" | 安装亚马逊云环境包 |
aws | pip install "clickzetta-ingestion-python[aws]" | 安装亚马逊云环境包 |
oss | pip install "clickzetta-ingestion-python[oss]" | 安装阿里云环境包 |
aliyun | pip install "clickzetta-ingestion-python[aliyun]" | 安装阿里云环境包 |
cos | pip install "clickzetta-ingestion-python[cos]" | 安装阿里云环境包 |
tencent | pip install "clickzetta-ingestion-python[tencent]" | 安装腾讯云环境包 |
gcp | pip install "clickzetta-ingestion-python[gcp]" | 安装 Google 云环境包 |
pip install "clickzetta-ingestion-python[google]" | 安装 Google 云环境包 |
批量导入原理
批量上传的SDK提供了一种高效的数据导入机制,适用于云器Lakehouse。以下是其工作原理的简化描述和流程图:
- 数据上传:通过SDK,您的数据首先被上传到对象存储服务。这一步骤的性能受到本地网络速度和并发连接数的影响。
- 触发导入:数据上传完成后,当您调用
bulkloadStream.commit()
方法时,SDK会自动触发一个SQL命令,将数据从对象存储导入到Lakehouse的表中。不建议您在一个任务中频繁调用bulkloadStream.commit()
,bulkloadStream.commit()
最后只能调用一次。 - 计算资源:上传数据建议选择通用型计算集群(GENERAL PURPOSE VIRTUAL CLUSTER)通用型计算资源更加适合跑批量作业和加载数据作业,数据从对象存储到Lakehouse表的导入速度取决于您配置的计算资源的大小。
- 分片上传优化:处理大于1GB的压缩数据时,建议在
createRow
方法中为每个并发线程或进程分配唯一的分片ID。这种做法能够充分发挥多线程或多进程的并行处理优势,显著提升数据导入的效率。最佳实践是,根据并发的数量来确定分片ID的数量,确保每个并发对应一个独立的分片ID。如果多个并发被分配了相同的分片ID,最终写入的数据可能会发生覆盖,导致先前的数据丢失。为确保所有分片的数据都被正确导入表中,请在所有并发操作完成后,调用bulkloadStream.commit()
方法来提交整个导入任务。
以下是批量导入原理的流程图:
单线程写入
假设上传数据的目标表为 public.bulkload_test
,DDL 如下:
单线程模式的完整样例代码:
API 分步详解
- 创建
connection
对象,根据您的实际情况替换参数即可:
参数 | 是否必填 | 描述 |
---|---|---|
username | Y | 用户名 |
password | Y | 密码 |
service | Y | 连接lakehouse的地址, region.api.clickzetta.com。可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串![]() |
instance | Y | 可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串以查看![]() |
workspace | Y | 使用的工作空间 |
vcluster | Y | 使用的vc |
schema | Y | 访问的schema名 |
- 创建
BulkLoad Stream
对象,指定上传的目标表、上传方式等: 必选参数table
表名称 可选参数schema
,如未指定则使用connection
指定的schema
operation
BulkLoadOperation.APPEND
:增量模式(写入的数据都作为新数据,不对老数据有任何影响) *BulkLoadOperation.OVERWRITE
:覆盖模式(清空老表数据,将新数据写入表中)
-
partition_spec
用于指定目标表的分区信息,控制数据写入的分区行为。-
非分区表:忽略此参数或设置为
-
分区表:
- 静态分区写入,需要将所有数据写入指定的固定分区,无论源数据中分区列的实际值是什么,写入目标表时都会使用
partition_spec
指定的分区值,所有数据都会写入到同一个指定分区中。参数格式是'分区列1=值1,分区列2=值2'
- 动态分区写入,根据数据中分区列的实际值,自动写入到对应分区。忽略此参数,系统根据数据中分区列的值自动创建或写入相应分区
- 静态分区写入,需要将所有数据写入指定的固定分区,无论源数据中分区列的实际值是什么,写入目标表时都会使用
-
- 创建
writer
并写入数据: 每个bulkload stream
可以创建多个writer
,不同writer
需要用不同 id 标识。使用多个writer
可以实现在一次 commit 中,多线程并发写或者分布式并发写的场景。 - 写入数据:
通过 writer 写入的数据将直接在存储系统中形成相应的 parquet 文件。writer 将根据写入的数据量自动进行文件切割。当 writer 写入数据结束后,需要显式调用 writer.close() 来保证数据完整性。 - 提交 stream。commit 前需要确保各 writer 均已完成写入并关闭。commit 成功后数据在表中可见。
分布式模式写入
Bulkdload stream 由 stream id(uuid)标识,可以通过 connection 的 get_bulkload_stream 方法来获取由其他进程创建的 stream 对象,并创建 writer,实现分布式写入。
示例代码
联系我们