Lakehouse Java SDK 简介
Lakehouse 为您提供名为 clickzetta-java
的 Java SDK,通过这个统一的 SDK,我们为用户提供以下功能:
- 标准 type 4 JDBC 驱动,方便您通过 JDBC 访问 ClickZetta Lakehouse
- 实时数据写入 SDK,支持您将实时数据快速写入 ClickZetta Lakehouse
- 批量数据写入 SDK,支持您将大量数据高效地写入 ClickZetta Lakehouse
如何获取
您可以通过 Maven 依赖的方式引入 clickzetta-java
SDK:
<dependency>
<groupId>com.clickzetta</groupId>
<artifactId>clickzetta-java</artifactId>
<version>${version}</version>
</dependency>
直接点击maven库在库中搜索
clickzetta-java
可以获取到最新的更新版本记录
注意事项
clickzetta-java
支持 Java 8 及以上版本。
- 当使用 Java 9 及以上版本时,需要添加 JVM 启动参数
--add-opens=java.base/java.nio=ALL-UNNAMED
以确保正常运行。
常见问题及解决方案
-
问题描述:javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
解决方案:您可能使用了 Java 的较早期版本(如 11.0.1+13),受 JDK 对 TLSv1.3 实现的问题 JDK-8211806 影响导致。我们推荐您升级到更新的稳定生产版本。如果无法进行 Java 版本更换,可以添加如下 Java 启动参数规避:-Djdk.tls.client.protocols=TLSv1.2
使用示例
1. 使用 JDBC 驱动连接 ClickZetta Lakehouse
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class ClickZettaJDBCExample {
public static void main(String[] args) {
String url = "jdbc:clickzetta://your-lakehouse-url";
String user = "your-username";
String password = "your-password";
try {
Class.forName("com.clickzetta.client.jdbc.ClickZettaDriver");
Connection connection = DriverManager.getConnection(url, user, password);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM schema.your_table");
while (resultSet.next()) {
System.out.println( resultSet.getString(1));
}
resultSet.close();
statement.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
- Lakehouse url可以在Lakehouse Studio管理-》工作空间中看到jdbc连接串以查看

2. 使用实时数据写入 SDK 向 ClickZetta Lakehouse 发送数据
// 建表 create table ingest_stream(id int,name string);
import com.clickzetta.client.ClickZettaClient;
import com.clickzetta.client.RowStream;
import com.clickzetta.platform.client.api.Options;
import com.clickzetta.platform.client.api.Row;
import com.clickzetta.platform.client.api.Stream;
import com.clickzetta.client.RealtimeStream;
public class RealtimeStreamDemo {
public static void main(String[] args) throws Exception {
if (args.length != 5) {
System.out.println("input arguments: jdbcUrl, username, password");
System.exit(1);
}
String jdbcUrl = args[0];
String username = args[1];
String password = args[2];
String schema = args[3];
String table = args[4];
ClickZettaClient client = ClickZettaClient.newBuilder().url(jdbcUrl).username(username).password(password).build();
Options options = Options.builder().build();
RowStream stream = client.newRealtimeStreamBuilder()
.operate(RowStream.RealTimeOperate.APPEND_ONLY)
.options(options)
.schema(schema)
.table(table)
.build();
for (int t = 0; t < 1000; t++) {
Row row = stream.createRow(Stream.Operator.INSERT);
row.setValue("id",t);
row.setValue("name", String.valueOf(t));
stream.apply(row);
}
// 调用 flush 之后数据会提交值服务端,如果不调用则根据上面刷新方式指定的参数写入。比如withFlushInterval
((RealtimeStream)stream).flush();
// 必须调用 stream close接口,close 时会隐含执行 flush
stream.close();
client.close();
}
}
3. 使用批量数据写入 SDK 向 ClickZetta Lakehouse 发送数据
// 建表 create table complex_type(col1 array<string>,col2 map<int,string>, col3 struct<x:int,y:int>);
import com.clickzetta.client.BulkloadStream;
import com.clickzetta.client.ClickZettaClient;
import com.clickzetta.client.RowStream;
import com.clickzetta.client.StreamState;
import com.clickzetta.platform.client.api.Options;
import com.clickzetta.platform.client.api.Row;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
public class BulkloadStreamDemo {
public static void main(String[] args) throws Exception{
if (args.length != 5) {
System.out.println("input arguments: jdbcUrl, username, password");
System.exit(1);
}
String jdbcUrl = args[0];
String username = args[1];
String password = args[2];
String schema = args[3];
String table = args[4];
ClickZettaClient client = ClickZettaClient.newBuilder().url(jdbcUrl).username(username).password(password).build();
Options options = Options.builder().build();
BulkloadStream bulkloadStream = client.newBulkloadStreamBuilder()
.schema(schema)
.table(table)
.operate(RowStream.BulkLoadOperate.APPEND)
.build();
for (int t = 0; t < 100; t++) {
Row row = bulkloadStream.createRow(0);
row.setValue("col1", Arrays.asList("first", "b", "c"));
final HashMap<Integer, String> map = new HashMap<Integer, String>();
map.put(t,"first"+t);
row.setValue("col2", map);
Map<String, Object> struct = new HashMap<>();
struct.put("x", t);
struct.put("y", t+1);
row.setValue("col3", struct);
bulkloadStream.apply(row, 0);
}
// 必须调用 stream close 接口,触发提交动作
bulkloadStream.close();
// 轮训提交状态,等待提交结束
while(bulkloadStream.getState() == StreamState.RUNNING) {
Thread.sleep(1000);
}
if (bulkloadStream.getState() == StreamState.FAILED) {
throw new RuntimeException(bulkloadStream.getErrorMessage());
}
client.close();
}
}