Lakehouse Bulkload 快速入门
简介
Bulkload 是 Lakehouse 提供的一种面向高吞吐量设计的批量数据写入接口,特别适合处理大量连续性写入场景。使用 Bulkload 写入的数据在提交后即可查看。
批量导入原理
批量上传的SDK提供了一种高效的数据导入机制,适用于云器Lakehouse。以下是其工作原理的简化描述和流程图:
- 数据上传:通过SDK,您的数据首先被上传到对象存储服务。这一步骤的性能受到本地网络速度和并发连接数的影响。
- 触发导入:数据上传完成后,当您调用
bulkloadStream.close()
方法时,SDK会自动触发一个SQL命令,将数据从对象存储导入到Lakehouse的表中。不建议您在一个任务中频繁调用bulkloadStream.close()
,bulkloadStream.close()
最后只能调用一次。
- 计算资源:上传数据建议选择通用型计算集群(GENERAL PURPOSE VIRTUAL CLUSTER)通用型计算资源更加适合跑批量作业和加载数据作业,数据从对象存储到Lakehouse表的导入速度取决于您配置的计算资源的大小。
- 分片上传优化:处理大于1GB的压缩数据时,建议在
createRow
方法中为每个并发线程或进程分配唯一的分片ID。这种做法能够充分发挥多线程或多进程的并行处理优势,显著提升数据导入的效率。最佳实践是,根据并发的数量来确定分片ID的数量,确保每个并发对应一个独立的分片ID。如果多个并发被分配了相同的分片ID,最终写入的数据可能会发生覆盖,导致先前的数据丢失。为确保所有分片的数据都被正确导入表中,请在所有并发操作完成后,调用bulkloadStream.close()
方法来提交整个导入任务。
以下是批量导入原理的流程图:
[SDK上传数据] ──> [对象存储] ──> [调用bulkloadStream.close()]
↓
[触发SQL命令] ──> [Lakehouse表]
适用场景
批量上传文件的SDK特别适用于以下情况:
- 一次性大量数据导入:当您需要导入大量数据时,无论是一次性的批量任务还是周期性但间隔较长的操作。
- 导入频率不高:如果您的导入数据频率不高(时间间隔大于五分钟),即使单次导入的数据量不大,使用批量导入SDK也是合适的。
不适用场景
批量上传文件的SDK不适用于以下情况:
- 实时数据导入:如果您需要在极短时间内(如5分钟之内)频繁导入数据,建议使用实时数据接口,以满足对实时性的要求。
写入限制
请注意,BulkloadStream 不支持主键(pk)表的写入。
创建 BulkloadStream
要通过 ClickZetta 客户端创建一个批量写入流,请参考以下示例代码:
RowStream stream = client.newBulkloadStreamBuilder()
.schema(schema)
.table(TABLE_NAME)
.operate(RowStream.BulkLoadOperate.APPEND)
.build();
Options
在clickzetta-java版本3.0.18以后提供了options。options用于指定上传选项入指定分取
bulkloadStream=client.newBulkloadStreamBuilder().schema(schema).table(table)
.options(BulkLoadOptions.newBuilder().withPartitionSpecs(Optional.of("your_partition_spec")).build())
.operate(RowStream.BulkLoadOperate.APPEND)
.build();
操作类型
在创建 Bulkload 时,可以通过 operate
方法指定以下操作类型:
-
RowStream.BulkLoadOperate.APPEND
:追加模式,向表中添加数据。
bulkloadStream=client.newBulkloadStreamBuilder().schema(schema).table(table)
.operate(RowStream.BulkLoadOperate.APPEND)
.build();
-
RowStream.BulkLoadOperate.OVERWRITE
:覆盖模式,删除表中现有数据后再写入新数据。
bulkloadStream=client.newBulkloadStreamBuilder().schema(schema).table(table)
.operate(RowStream.BulkLoadOperate.OVERWRITE)
.build();
写入数据
使用 Row 对象表示要写入的具体数据。通过调用 row.setValue
方法将数据封装到 Row 对象中。
Row row = stream.createRow(0);
row.setValue("id", t);
row.setValue("name", String.valueOf(t));
stream.apply(row, 0);
createRow
方法创建 Row 对象时,需要传入一个整数作为分片 ID。这个 ID 可以配合多线程/进程技术,用多个互补相同的分片 ID 来写入数据,从而有效提升写入数据的速度。
setValue
方法的第一个参数为字段名,第二个参数为具体的数据。要求数据类型与表类型一致。
apply
方法用于写入数据,需要指定 Row 对象以及相应的分片 ID。
写入复杂类型数据
// 写入数组
row.setValue("col1", Arrays.asList("first", "b", "c"));
// 写入映射
final HashMap<Integer, String> map = new HashMap<Integer, String>();
map.put(t, "first" + t);
row.setValue("col2", map);
// 写入结构体
Map<String, Object> struct = new HashMap<>();
struct.put("first", "first-" + i);
struct.put("second", i);
row.setValue("col3", struct);
提交数据
批量写入的数据只有在提交之后才可见。因此,提交过程非常重要。
- 通过
bulkloadStream.getState()
获取 BulkloadStream 的状态。
- 如果提交失败,可以通过
bulkloadStream.getErrorMessage()
获取错误信息。
使用示例
以下是一个使用 Bulkload 写入复杂类型数据的示例:
// 建表 create table complex_type(col1 array<string>,col2 map<int,string>, col3 struct<x:int,y:int>);
import com.clickzetta.client.ClickZettaClient;
import com.clickzetta.client.RowStream;
import com.clickzetta.platform.client.api.Options;
import com.clickzetta.platform.client.api.Row;
import com.clickzetta.platform.client.api.Stream;
public class BulkloadStreamDemo {
public static void main(String[] args) throws Exception{
if (args.length != 5) {
System.out.println("input arguments: jdbcUrl, username, password");
System.exit(1);
}
String jdbcUrl = args[0];
String username = args[1];
String password = args[2];
String schema = args[3];
String table = args[4];
ClickZettaClient client = ClickZettaClient.newBuilder().url(jdbcUrl).username(username).password(password).build();
BulkloadStream bulkloadStream = client.newBulkloadStreamBuilder()
.schema(schema)
.table(table)
.operate(RowStream.BulkLoadOperate.APPEND)
.build();
for (int t = 0; t < 100; t++) {
Row row = bulkloadStream.createRow(0);
row.setValue("col1", Arrays.asList("first", "b", "c"));
final HashMap<Integer, String> map = new HashMap<Integer, String>();
map.put(t,"first"+t);
row.setValue("col2", map);
Map<String, Object> struct = new HashMap<>();
struct.put("x", t);
struct.put("y", t+1);
row.setValue("col3", struct);
bulkloadStream.apply(row, 0);
}
// 必须调用 stream close 接口,触发提交动作
bulkloadStream.close();
// 轮训提交状态,等待提交结束
while(bulkloadStream.getState() == StreamState.RUNNING) {
Thread.sleep(1000);
}
if (bulkloadStream.getState() == StreamState.FAILED) {
throw new RuntimeException(bulkloadStream.getErrorMessage());
}
client.close();
}
}
- Lakehouse url可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串以查看
