使用Java SDK批量上传数据

本文档主要介绍如何使用Java SDK的Bulkloadstream如何批量将数据加载到Lakehouse中,适合一次性大量数据导入,支持自定义数据源,提供数据导入的灵活性。本次案例以读取本地文件为例,如果您的数据源在对象存储或者Lakehouse Studio数据集成支持的范围内,推荐您使用COPY命令或者数据集成

参考文档

Java SDK批量上传数据

应用场景

  • 适用于需要批量上传大量数据的业务场景。
  • 适合熟悉Java并需要自定义数据导入逻辑的开发人员。

使用限制

  • BulkloadStream不支持主键(pk)表的写入。
  • 不适用于时间间隔小于五分钟的频繁数据上传场景。

使用案例

本案例以读取本地CSV文件为例,本次使用的数据集是巴西电子商务公共数据集中的olist_order_items_dataset数据。推荐在数据源位于对象存储或Lakehouse Studio支持的数据集成范围内时,使用COPY命令或数据集成。

前置条件

  • 创建表

    • create    table bulk_order_items (
                order_id STRING,
                order_item_id INT,
                product_id STRING,
                seller_id STRING,
                shipping_limit_date STRING,
                price DOUBLE,
                freight_value DOUBLE
                );
  • 对目标表具有INSERT权限。

使用Java代码开发

Maven依赖

在项目的pom.xml文件中添加Lakehouse的Maven依赖,Lakehouse maven最新依赖可以在maven库中找到

<dependency>
    <groupId>com.clickzetta</groupId>
    <artifactId>clickzetta-java</artifactId>
    <version>1.3.1</version>
</dependency>

编写Java代码

  1. 初始化Lakehouse客户端和BulkloadStream:创建BulkloadFile类,初始化Lakehouse连接和BulkloadStream对象。
  2. 读取本地CSV文件并写入Lakehouse:使用Java IO流读取本地CSV文件,并将数据逐行写入Lakehouse。

import com.clickzetta.client.BulkloadStream;
import com.clickzetta.client.ClickZettaClient;
import com.clickzetta.client.RowStream;
import com.clickzetta.client.StreamState;
import com.clickzetta.platform.client.api.Row;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.text.MessageFormat;

public class BulkloadFile {
    private static ClickZettaClient client;
    private static final String password = "";
    private static final String table = "bulk_order_items";
    private static final String workspace = "";
    private static final String schema = "public";
    private static final String vc = "default";
    private static final String user = "";
    static BulkloadStream bulkloadStream;
    public static void main(String[] args) throws Exception {
        initialize();
        File csvFile = new File("olist_order_items_dataset.csv");
        BufferedReader reader = new BufferedReader(new FileReader(csvFile));
        // Skip the header row
        reader.readLine(); // Skip the first line (header)
        // Insert data into the database
        String line;

        while ((line = reader.readLine()) != null) {
            String[] values = line.split(",");
            //类型转化保持和服务端类型一致
            String orderId = values[0];
            int orderItemId = Integer.parseInt(values[1]); // Convert order_item_id to int
            String productId = values[2];
            String sellerId = values[3];
            String shippingLimitDate = values[4];
            double price = Double.parseDouble(values[5]);
            double freightValue = Double.parseDouble(values[6]);
            Row row = bulkloadStream.createRow();
            // Set parameter values
            row.setValue(0, orderId);
            row.setValue(1, orderItemId);
            row.setValue(2, productId);
            row.setValue(3, sellerId);
            row.setValue(4, shippingLimitDate);
            row.setValue(5, price);
            row.setValue(6, freightValue);
         //必须调用该方法,否则无法将数据发送到服务端
            bulkloadStream.apply(row);
        }
        // Close resources
        reader.close();
        bulkloadStream.close();
        waitForBulkloadCompletion();
        client.close();
        System.out.println("Data inserted successfully!");
    }
    private static void initialize() throws Exception {
        String url = MessageFormat.format("jdbc:clickzetta://jnsxwfyr.uat-api.clickzetta.com/{0}?" +
                        "schema={1}&username={2}&password={3}&virtualcluster={4}&",
                workspace, schema, user, password, vc);
        client = ClickZettaClient.newBuilder().url(url).build();
        bulkloadStream = client.newBulkloadStreamBuilder().schema(schema).table(table)
                .operate(RowStream.BulkLoadOperate.APPEND)
                .build();
    }
    private static void waitForBulkloadCompletion() throws InterruptedException {
        while (bulkloadStream.getState() == StreamState.RUNNING) {
            Thread.sleep(1000);
        }
        if (bulkloadStream.getState() == StreamState.FAILED) {
            throw new ArithmeticException(bulkloadStream.getErrorMessage());
        }
    }

}

联系我们
预约咨询
微信咨询
电话咨询