Lakehouse Bulkload 快速入门

简介

Bulkload 是 Lakehouse 提供的一种面向高吞吐量设计的批量数据写入接口,特别适合处理大量连续性写入场景。使用 Bulkload 写入的数据在提交后即可查看。

批量导入原理

批量上传的SDK提供了一种高效的数据导入机制,适用于云器Lakehouse。以下是其工作原理的简化描述和流程图:

  1. 数据上传:通过SDK,您的数据首先被上传到对象存储服务。这一步骤的性能受到本地网络速度和并发连接数的影响。
  2. 触发导入:数据上传完成后,当您调用bulkloadStream.close()方法时,SDK会自动触发一个SQL命令,将数据从对象存储导入到Lakehouse的表中。不建议您在一个任务中频繁调用bulkloadStream.close()bulkloadStream.close()最后只能调用一次。
  3. 计算资源:上传数据建议选择通用型计算集群(GENERAL PURPOSE VIRTUAL CLUSTER)通用型计算资源更加适合跑批量作业和加载数据作业,数据从对象存储到Lakehouse表的导入速度取决于您配置的计算资源的大小。
  4. 分片上传优化:处理大于1GB的压缩数据时,建议在createRow方法中为每个并发线程或进程分配唯一的分片ID。这种做法能够充分发挥多线程或多进程的并行处理优势,显著提升数据导入的效率。最佳实践是,根据并发的数量来确定分片ID的数量,确保每个并发对应一个独立的分片ID。如果多个并发被分配了相同的分片ID,最终写入的数据可能会发生覆盖,导致先前的数据丢失。为确保所有分片的数据都被正确导入表中,请在所有并发操作完成后,调用bulkloadStream.close()方法来提交整个导入任务。

以下是批量导入原理的流程图:

[SDK上传数据] ──> [对象存储] ──> [调用bulkloadStream.close()]
                                ↓
                         [触发SQL命令] ──> [Lakehouse表]

适用场景

批量上传文件的SDK特别适用于以下情况:

  • 一次性大量数据导入:当您需要导入大量数据时,无论是一次性的批量任务还是周期性但间隔较长的操作。
  • 导入频率不高:如果您的导入数据频率不高(时间间隔大于五分钟),即使单次导入的数据量不大,使用批量导入SDK也是合适的。

不适用场景

批量上传文件的SDK不适用于以下情况:

  • 实时数据导入:如果您需要在极短时间内(如5分钟之内)频繁导入数据,建议使用实时数据接口,以满足对实时性的要求。

写入限制

请注意,BulkloadStream 不支持主键(pk)表的写入。

创建 BulkloadStream

要通过 ClickZetta 客户端创建一个批量写入流,请参考以下示例代码:

RowStream stream = client.newBulkloadStreamBuilder()
        .schema(schema)
        .table(TABLE_NAME)
        .operate(RowStream.BulkLoadOperate.APPEND)
        .build();

Options

在clickzetta-java版本3.0.18以后提供了options。options用于指定上传选项入指定分取

bulkloadStream=client.newBulkloadStreamBuilder().schema(schema).table(table)
                .options(BulkLoadOptions.newBuilder().withPartitionSpecs(Optional.of("your_partition_spec")).build())

                .operate(RowStream.BulkLoadOperate.APPEND)
                .build();
  • withPartitionSpecs 用于指定目标表的分区信息,控制数据写入的分区行为。

    • 非分区表:忽略此参数或设置为
    • 分区表:
      • 静态分区写入,需要将所有数据写入指定的固定分区,无论源数据中分区列的实际值是什么,写入目标表时都会使用 partition_spec 指定的分区值,所有数据都会写入到同一个指定分区中。参数格式是'分区列1=值1,分区列2=值2'
        • 动态分区写入,根据数据中分区列的实际值,自动写入到对应分区。忽略此参数,系统根据数据中分区列的值自动创建或写入相应分区

操作类型

在创建 Bulkload 时,可以通过 operate 方法指定以下操作类型:

  • RowStream.BulkLoadOperate.APPEND:追加模式,向表中添加数据。

    bulkloadStream=client.newBulkloadStreamBuilder().schema(schema).table(table)
            .operate(RowStream.BulkLoadOperate.APPEND)
            .build();
  • RowStream.BulkLoadOperate.OVERWRITE:覆盖模式,删除表中现有数据后再写入新数据。

    bulkloadStream=client.newBulkloadStreamBuilder().schema(schema).table(table)
            .operate(RowStream.BulkLoadOperate.OVERWRITE)
            .build();

写入数据

使用 Row 对象表示要写入的具体数据。通过调用 row.setValue 方法将数据封装到 Row 对象中。

Row row = stream.createRow(0);
row.setValue("id", t);
row.setValue("name", String.valueOf(t));
stream.apply(row, 0);
  • createRow 方法创建 Row 对象时,需要传入一个整数作为分片 ID。这个 ID 可以配合多线程/进程技术,用多个互补相同的分片 ID 来写入数据,从而有效提升写入数据的速度。
  • setValue 方法的第一个参数为字段名,第二个参数为具体的数据。要求数据类型与表类型一致。
  • apply 方法用于写入数据,需要指定 Row 对象以及相应的分片 ID。

写入复杂类型数据

// 写入数组
row.setValue("col1", Arrays.asList("first", "b", "c"));

// 写入映射
final HashMap<Integer, String> map = new HashMap<Integer, String>();
map.put(t, "first" + t);
row.setValue("col2", map);

// 写入结构体
Map<String, Object> struct = new HashMap<>();
struct.put("first", "first-" + i);
struct.put("second", i);
row.setValue("col3", struct);

提交数据

批量写入的数据只有在提交之后才可见。因此,提交过程非常重要。

bulkloadStream.close();
  • 通过 bulkloadStream.getState() 获取 BulkloadStream 的状态。
  • 如果提交失败,可以通过 bulkloadStream.getErrorMessage() 获取错误信息。

使用示例

以下是一个使用 Bulkload 写入复杂类型数据的示例:

// 建表 create table complex_type(col1 array<string>,col2 map<int,string>, col3 struct<x:int,y:int>);
import com.clickzetta.client.ClickZettaClient;
import com.clickzetta.client.RowStream;
import com.clickzetta.platform.client.api.Options;
import com.clickzetta.platform.client.api.Row;
import com.clickzetta.platform.client.api.Stream;

public class BulkloadStreamDemo {
    public static void main(String[] args) throws Exception{
        if (args.length != 5) {
            System.out.println("input arguments: jdbcUrl, username, password");
            System.exit(1);
        }
        String jdbcUrl = args[0];
        String username = args[1];
        String password = args[2];
        String schema = args[3];
        String table = args[4];

        ClickZettaClient client = ClickZettaClient.newBuilder().url(jdbcUrl).username(username).password(password).build();


        BulkloadStream bulkloadStream = client.newBulkloadStreamBuilder()
                .schema(schema)
                .table(table)
                .operate(RowStream.BulkLoadOperate.APPEND)
                .build();

        for (int t = 0; t < 100; t++) {
            Row row = bulkloadStream.createRow(0);
            row.setValue("col1", Arrays.asList("first", "b", "c"));
            final HashMap<Integer, String> map = new HashMap<Integer, String>();
            map.put(t,"first"+t);
            row.setValue("col2", map);
            Map<String, Object> struct = new HashMap<>();
            struct.put("x", t);
            struct.put("y", t+1);
            row.setValue("col3", struct);
            bulkloadStream.apply(row, 0);
        }
        // 必须调用 stream close 接口,触发提交动作
        bulkloadStream.close();

        // 轮训提交状态,等待提交结束
        while(bulkloadStream.getState() == StreamState.RUNNING) {
            Thread.sleep(1000);
        }
        if (bulkloadStream.getState() == StreamState.FAILED) {
            throw new RuntimeException(bulkloadStream.getErrorMessage());
        }
        client.close();
    }
}
  • Lakehouse url可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串以查看

联系我们
预约咨询
微信咨询
电话咨询