将数据导入云器Lakehouse的完整指南
数据入仓:通过JavaSDK批量和实时加载数据
概述
云器Lakehouse提供了JAVA SDK,在流行的IDE(比如VS Code)里通过JAVA和SQL编程的方式,将数据加载到云器Lakehouse的表里。
使用场景
这种方式可以方便地批量加载数据,适合在Java编程环境里大量数据的皮脸刚和和实时上传,因为云器Lakehouse针对写入大批量数据进行了更多优化。
实现步骤
下载代码
从Github仓库里下载本指南的代码到本地(如果已下载请忽略)。
将目录加入到VS Code里。

预览
修改参数
将config/config-ingest-sample.json文件名改为config/config-ingest.json,并修改config-ingest.json各个参数值。
{ "username": "请输入您的用户名", "password": "请输入您的密码", "service": "请输入您的服务地址,例如 api.clickzetta.com", "instance": "请输入您的实例 ID", "workspace": "请输入您的工作空间,例如 gharchive", "schema": "请输入您的模式,例如 public", "vcluster": "请输入您的虚拟集群,例如 default_ap", "sdk_job_timeout": 10, "hints": { "sdk.job.timeout": 3, "query_tag": "a_comprehensive_guide_to_ingesting_data_into_clickzetta" } }
批量加载
在VS Code运行BulkLoadFile.java:

预览
import com.clickzetta.client.BulkloadStream; import com.clickzetta.client.ClickZettaClient; import com.clickzetta.client.RowStream; import com.clickzetta.client.StreamState; import com.clickzetta.platform.client.api.Row; import org.json.JSONObject; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.text.MessageFormat; import java.nio.file.Files; import java.nio.file.Paths; import org.apache.log4j.PropertyConfigurator; public class BulkloadFile { private static ClickZettaClient client; private static String service; private static String instance; private static String password; private static String table = "lift\_tuckets\_import\_by\_java\_sdk\_bulkload"; private static String workspace; private static String schema; private static String vc; private static String user; static BulkloadStream bulkloadStream; public static void main(String\[] args) throws Exception { // 加载 log4j 配置文件 PropertyConfigurator.configure("config/log4j.properties"); // 读取配置文件 String content = new String(Files.readAllBytes(Paths.get("config/config-ingest.json"))); JSONObject config = new JSONObject(content); // 从 JSON 配置文件中获取值 service = config.getString("service"); instance = config.getString("instance"); password = config.getString("password"); workspace = config.getString("workspace"); schema = config.getString("schema"); vc = config.getString("vcluster"); user = config.getString("username"); // 初始化 initialize(); // 统计文件里的行数 int fileLineCount = countFileLines("data/lift\_tickets\_data.csv"); System.out.println("Total lines in file: " + fileLineCount); // 创建表 createTable(); // 如果目标表存在,删除表里的数据 deleteTableData(); // 插入数据 insertData(); // 检查表中数据行数 int tableRowCount = countTableRows(); System.out.println("Total rows in table: " + tableRowCount); // 比较文件里的行数和表里的行数 if (fileLineCount == tableRowCount) { System.out.println("Data inserted successfully! The row count matches."); } else { System.out.println("Data insertion failed! The row count does not match."); } // 关闭客户端 client.close(); } private static void initialize() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); client = ClickZettaClient.newBuilder().url(url).build(); } private static int countFileLines(String filePath) throws Exception { try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { int lines = 0; while (reader.readLine() != null) lines++; return lines - 1; // 减去标题行 } } private static void createTable() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); String createTableSQL = "CREATE TABLE if not exists " + table + " (" + "\`txid\` string," + "\`rfid\` string," + "\`resort\` string," + "\`purchase\_time\` string," + "\`expiration\_time\` string," + "\`days\` int," + "\`name\` string," + "\`address\_street\` string," + "\`address\_city\` string," + "\`address\_state\` string," + "\`address\_postalcode\` string," + "\`phone\` string," + "\`email\` string," + "\`emergency\_contact\_name\` string," + "\`emergency\_contact\_phone\` string);"; try (Connection conn = DriverManager.getConnection(url, user, password); PreparedStatement pstmt = conn.prepareStatement(createTableSQL)) { pstmt.executeUpdate(); System.out.println("Table created successfully."); } catch (Exception e) { // 忽略该错误并继续 System.out.println("Ignoring exception: " + e.getMessage()); } } private static void deleteTableData() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); try (Connection conn = DriverManager.getConnection(url, user, password); PreparedStatement pstmt = conn.prepareStatement("DELETE FROM " + schema + "." + table)) { pstmt.executeUpdate(); System.out.println("Data deleted successfully from table: " + table); } } private static void insertData() throws Exception { bulkloadStream = client.newBulkloadStreamBuilder().schema(schema).table(table) .operate(RowStream.BulkLoadOperate.APPEND) .build(); File csvFile = new File("data/lift\_tickets\_data.csv"); BufferedReader reader = new BufferedReader(new FileReader(csvFile)); // 跳过标题行 reader.readLine(); // Skip the first line (header) // 插入数据到数据库 String line; while ((line = reader.readLine()) != null) { String\[] values = line.split(","); // 类型转换保持和服务端类型一致 String id = values\[0]; // ID 是字符串类型 String contentValue = values\[1]; Row row = bulkloadStream.createRow(); // 设置参数值 row\.setValue(0, id); row\.setValue(1, contentValue); // 必须调用该方法,否则无法将数据发送到服务端 bulkloadStream.apply(row); } // 关闭资源 reader.close(); bulkloadStream.close(); waitForBulkloadCompletion(); } private static int countTableRows() throws Exception { String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); try (Connection conn = DriverManager.getConnection(url, user, password); Statement stmt = conn.createStatement()) { String countSQL = "SELECT COUNT(\*) FROM " + schema + "." + table; try (ResultSet rs = stmt.executeQuery(countSQL)) { if (rs.next()) { return rs.getInt(1); } else { throw new Exception("Failed to count table rows."); } } } } private static void waitForBulkloadCompletion() throws InterruptedException { while (bulkloadStream.getState() == StreamState.RUNNING) { Thread.sleep(1000); } if (bulkloadStream.getState() == StreamState.FAILED) { throw new ArithmeticException(bulkloadStream.getErrorMessage()); } } }
查看运行结果:
实时加载
在VS Code运行StreamingInsert.java:
import com.clickzetta.client.ClickZettaClient; import com.clickzetta.client.RealtimeStream; import com.clickzetta.client.RowStream; import com.clickzetta.platform.client.api.Options; import com.clickzetta.platform.client.api.Row; import com.clickzetta.platform.client.api.Stream; import com.github.javafaker.Faker; import org.json.JSONObject; import org.apache.log4j.PropertyConfigurator; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.Statement; import java.text.MessageFormat; import java.util.Date; import java.util.Random; import java.util.UUID; import java.io.IOException; import java.util.Locale; public class StreamingInsert { private static ClickZettaClient client; private static String service; private static String instance; private static String password; private static String table = "lift\_tuckets\_import\_by\_java\_sdk\_realtime\_ingest"; private static String workspace; private static String schema; private static String vc; private static String user; static RealtimeStream realtimeStream; public static void main(String\[] args) throws Exception { // 加载 log4j 配置文件 PropertyConfigurator.configure("config/log4j.properties"); // 读取配置文件 String content = new String(Files.readAllBytes(Paths.get("config/config-ingest.json"))); JSONObject config = new JSONObject(content); // 从 JSON 配置文件中获取值 service = config.getString("service"); instance = config.getString("instance"); password = config.getString("password"); workspace = config.getString("workspace"); schema = config.getString("schema"); vc = config.getString("vcluster"); user = config.getString("username"); // 初始化 String url = MessageFormat.format("jdbc\:clickzetta://{1}.{0}/{2}?" + "schema={3}\&username={4}\&password={5}\&virtualcluster={6}&", service, instance, workspace, schema, user, password, vc); Options options = Options.builder().build(); client = ClickZettaClient.newBuilder().url(url).build(); // 检查并创建目标表 checkAndCreateTable(url); realtimeStream = client.newRealtimeStreamBuilder() .operate(RowStream.RealTimeOperate.CDC) .options(options) .schema(schema) .table(table) .build(); Faker faker = new Faker(new Locale("zh", "CN")); String\[] resorts = {"Resort 1", "Resort 2", "Resort 3"}; Random random = new Random(); int duration = 200; int maxRetries = 3; // 记录开始时间 long startTime = System.currentTimeMillis(); System.out.println("Start time: " + new Date(startTime)); int totalRecords = 0; while (duration > 0) { for (int t = 1; t < 11; t++) { Row row = realtimeStream.createRow(Stream.Operator.UPSERT); row\.setValue("txid", UUID.randomUUID().toString()); row\.setValue("rfid", Long.toHexString(random.nextLong() & ((1L << 96) - 1))); row\.setValue("resort", faker.options().option(resorts)); row\.setValue("purchase\_time", new Date().toString()); row\.setValue("expiration\_time", new Date(System.currentTimeMillis() + 86400000L).toString()); row\.setValue("days", faker.number().numberBetween(1, 7)); row\.setValue("name", faker.name().fullName()); row\.setValue("address\_street", faker.address().streetAddress()); row\.setValue("address\_city", faker.address().city()); row\.setValue("address\_state", faker.address().state()); row\.setValue("address\_postalcode", faker.address().zipCode()); row\.setValue("phone", faker.phoneNumber().phoneNumber()); row\.setValue("email", faker.internet().emailAddress()); row\.setValue("emergency\_contact\_name", faker.name().fullName()); row\.setValue("emergency\_contact\_phone", faker.phoneNumber().phoneNumber()); int attempts = 0; boolean success = false; while (attempts < maxRetries && !success) { try { realtimeStream.apply(row); success = true; } catch (IOException e) { attempts++; System.err.println("Attempt " + attempts + " failed: " + e.getMessage()); if (attempts == maxRetries) { throw e; } Thread.sleep(1000); // 等待 1 秒后重试 } } totalRecords++; } Thread.sleep(200); duration = duration - 1; } realtimeStream.close(); client.close(); // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("End time: " + new Date(endTime)); // 计算平均每秒插入的记录数 double elapsedTimeInSeconds = (endTime - startTime) / 1000.0; double recordsPerSecond = totalRecords / elapsedTimeInSeconds; System.out.println("Total records inserted: " + totalRecords); System.out.println("Elapsed time (seconds): " + elapsedTimeInSeconds); System.out.println("Average records per second: " + recordsPerSecond); } private static void checkAndCreateTable(String url) throws Exception { String checkTableSQL = "SELECT 1 FROM " + schema + "." + table + " LIMIT 1"; String createTableSQL = "CREATE TABLE if not exists " + table + " (" + "\`txid\` string PRIMARY KEY," + "\`rfid\` string," + "\`resort\` string," + "\`purchase\_time\` string," + "\`expiration\_time\` string," + "\`days\` int," + "\`name\` string," + "\`address\_street\` string," + "\`address\_city\` string," + "\`address\_state\` string," + "\`address\_postalcode\` string," + "\`phone\` string," + "\`email\` string," + "\`emergency\_contact\_name\` string," + "\`emergency\_contact\_phone\` string);"; try (Connection conn = DriverManager.getConnection(url, user, password); Statement stmt = conn.createStatement()) { try (ResultSet rs = stmt.executeQuery(checkTableSQL)) { // 如果表存在,什么也不做 } catch (Exception e) { // 如果表不存在,创建表 try (PreparedStatement pstmt = conn.prepareStatement(createTableSQL)) { pstmt.executeUpdate(); System.out.println("Table created successfully."); } } } } }
查看运行结果:
下一步建议
通过Studio 数据管理查看导入进的数据。 对导入进的数据进行清洗和转化。 通过Data GPT探查和分析导入进的数据。
资料
Java SDK简介
Yunqi © 2024 Yunqi, Inc. All Rights Reserved.
联系我们