使用Java SDK批量上传数据
本文档主要介绍如何使用Java SDK的Bulkloadstream如何批量将数据加载到Lakehouse中,适合一次性大量数据导入,支持自定义数据源,提供数据导入的灵活性。本次案例以读取本地文件为例,如果您的数据源在对象存储或者Lakehouse Studio数据集成支持的范围内,推荐您使用COPY命令或者数据集成
参考文档
Java SDK批量上传数据
应用场景
- 适用于需要批量上传大量数据的业务场景。
- 适合熟悉Java并需要自定义数据导入逻辑的开发人员。
使用限制
- BulkloadStream不支持主键(pk)表的写入。
- 不适用于时间间隔小于五分钟的频繁数据上传场景。
使用案例
本案例以读取本地CSV文件为例,本次使用的数据集是巴西电子商务公共数据集中的olist_order_items_dataset数据。推荐在数据源位于对象存储或Lakehouse Studio支持的数据集成范围内时,使用COPY命令或数据集成。
前置条件
-
创建表
-
create table bulk_order_items (
order_id STRING,
order_item_id INT,
product_id STRING,
seller_id STRING,
shipping_limit_date STRING,
price DOUBLE,
freight_value DOUBLE
);
-
对目标表具有INSERT权限。
使用Java代码开发
Maven依赖
在项目的pom.xml
文件中添加Lakehouse的Maven依赖,Lakehouse maven最新依赖可以在maven库中找到
<dependency>
<groupId>com.clickzetta</groupId>
<artifactId>clickzetta-java</artifactId>
<version>1.3.1</version>
</dependency>
编写Java代码
- 初始化Lakehouse客户端和BulkloadStream:创建
BulkloadFile
类,初始化Lakehouse连接和BulkloadStream对象。
- 读取本地CSV文件并写入Lakehouse:使用Java IO流读取本地CSV文件,并将数据逐行写入Lakehouse。
import com.clickzetta.client.BulkloadStream;
import com.clickzetta.client.ClickZettaClient;
import com.clickzetta.client.RowStream;
import com.clickzetta.client.StreamState;
import com.clickzetta.platform.client.api.Row;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.text.MessageFormat;
public class BulkloadFile {
private static ClickZettaClient client;
private static final String password = "";
private static final String table = "bulk_order_items";
private static final String workspace = "";
private static final String schema = "public";
private static final String vc = "default";
private static final String user = "";
static BulkloadStream bulkloadStream;
public static void main(String[] args) throws Exception {
initialize();
File csvFile = new File("olist_order_items_dataset.csv");
BufferedReader reader = new BufferedReader(new FileReader(csvFile));
// Skip the header row
reader.readLine(); // Skip the first line (header)
// Insert data into the database
String line;
while ((line = reader.readLine()) != null) {
String[] values = line.split(",");
//类型转化保持和服务端类型一致
String orderId = values[0];
int orderItemId = Integer.parseInt(values[1]); // Convert order_item_id to int
String productId = values[2];
String sellerId = values[3];
String shippingLimitDate = values[4];
double price = Double.parseDouble(values[5]);
double freightValue = Double.parseDouble(values[6]);
Row row = bulkloadStream.createRow();
// Set parameter values
row.setValue(0, orderId);
row.setValue(1, orderItemId);
row.setValue(2, productId);
row.setValue(3, sellerId);
row.setValue(4, shippingLimitDate);
row.setValue(5, price);
row.setValue(6, freightValue);
//必须调用该方法,否则无法将数据发送到服务端
bulkloadStream.apply(row);
}
// Close resources
reader.close();
bulkloadStream.close();
waitForBulkloadCompletion();
client.close();
System.out.println("Data inserted successfully!");
}
private static void initialize() throws Exception {
String url = MessageFormat.format("jdbc:clickzetta://jnsxwfyr.uat-api.clickzetta.com/{0}?" +
"schema={1}&username={2}&password={3}&virtualcluster={4}&",
workspace, schema, user, password, vc);
client = ClickZettaClient.newBuilder().url(url).build();
bulkloadStream = client.newBulkloadStreamBuilder().schema(schema).table(table)
.operate(RowStream.BulkLoadOperate.APPEND)
.build();
}
private static void waitForBulkloadCompletion() throws InterruptedException {
while (bulkloadStream.getState() == StreamState.RUNNING) {
Thread.sleep(1000);
}
if (bulkloadStream.getState() == StreamState.FAILED) {
throw new ArithmeticException(bulkloadStream.getErrorMessage());
}
}
}