实时写入原理
实时写入Lakehouse的SDK是一种高效的数据流处理工具,它允许用户将数据实时地上传并存储到Lakehouse中。以下是实时写入的工作原理:
- SDK上传数据:用户通过SDK将数据实时上传到Lakehouse的Ingestion Service。
- Ingestion Service处理:Ingestion Service接收到数据后,直接将数据写入到Lakehouse的表中,此时数据以临时中间文件的形式存储,这个阶段称为混合表。
- 查询实时数据:在数据提交之前,用户即可查询(select)到这些实时写入的新数据,但这些数据对于table stream、materialized view和dynamic table来说是不可见的。
- 数据提交:新写入的数据会在大约一分钟后自动提交,提交后,table stream、materialized view和dynamic table都能够读取这部分数据。
- 混合表变成普通表:在数据提交后,后台进程会将混合表合并变成普通表,合并完成后用户可以执行更新操作(update/merge/delete)。
适用场景
实时写入Lakehouse的SDK适用于以下场景:
- 短间隔数据导入:如果您的应用场景要求在非常短的时间间隔内(如5分钟或更短)导入数据,实时写入SDK可以满足您的需求。
- 频繁小量数据提交:对于需要频繁提交数据,但每次提交的数据量不大的情况,实时写入SDK提供了一个高效的解决方案。
- 实时数据分析:实时写入SDK适合需要对数据进行即时分析和响应的应用,例如实时监控、事件追踪和实时报告等。
注意事项
- 实时写入的数据可以秒级查询。
- 实时写入数据目前只能通过内部提供的 Flink Connector 中支持 schema change 的 sink 算子(单并发)来实现实时的表结构变化感知。在其他场景下进行表结构更改时,需要先停止实时写入任务,然后在表结构变更后一段时间(大约90分钟)后,重新启动任务。
- table stream、materialized view 和 dynamic table 只能显示已经提交的数据。实时任务写入的数据需要等待 1 分钟才能确认,因此 table stream 也需要等待 1 分钟才能看到。
通过客户端创建实时数据流
要创建实时数据流,首先需要使用 Lakehouse 客户端(client):
operate:传入一个枚举值,实时接口支持 RowStream.RealTimeOperate.APPEND_ONLY 和 RowStream.RealTimeOperate.CDC。 options:用于传入实时写入流的参数,具体见下文的 options 说明。
选项(Options)
可以将以下选项(options)传入到实时数据流中,用于控制写入数据的行为。这些参数均为选填,推荐使用默认参数。
参数说明
| 类别 | 参数 | 默认值 | 说明 |
|---|---|---|---|
| 写入性能相关参数 | withFlushMode | FlushMode.AUTO_FLUSH_BACKGROUND | 数据刷写方式,目前支持:<br>FlushMode.AUTO_FLUSH_SYNC:每次等待上次刷新完成后才进行下一步写入。<br>FlushMode.AUTO_FLUSH_BACKGROUND:异步刷新,允许多个写入同时进行,不需要等待前一次写入完成。 |
| 写入性能相关参数 | withMutationBufferLinesNum | 100 | 每个 buffer 中的数据条数的积攒限制,积攒达到后则会发送至服务端。当达到此限制时,数据将被实际 flush 提交至服务端。如果一次导入的数据量达到 MB 级别,可以调大此参数以加快导入速度。数据提交至服务端的条件是 MutationBufferLinesNum 或 withFlushInterval 中任意一个优先达到。 |
| withMutationBufferMaxNum | 5 | 在数据提交过程中,withMutationBufferLinesNum 指定了在达到一定数量的数据条目后进行发送,这是一个触发异步发送的阈值。而 withMutationBufferMaxNum 则定义了可以同时存在的缓冲区(buffer)的最大数量。即使前一个缓冲区的数据还未完全写入,只要缓冲区的数量没有超过 withMutationBufferMaxNum 指定的限制,就可以继续向新的缓冲区写入数据。这允许系统在处理和发送数据时实现更高的并发性,因为不必等待所有缓冲区都清空才能继续写入新数据。简而言之,withMutationBufferMaxNum 相当于jdbc连接池 | |
| 写入性能相关参数 | withMutationBufferSpace | 5 * 1024 * 1024 (5MB) | 当达到此限制时,数据将被实际 flush 提交至服务端。如果一次导入的数据量达到 MB 级别,可以调大此参数以加快导入速度。数据提交至服务端的条件是 withMutationBufferSpace 或 withMutationBufferLinesNum 中任意一个优先达到。 |
| 写入性能相关参数 | withFlushInterval | 10 * 1000 (10秒) | 当达到此时间限制时,数据将被实际 flush 提交至服务端。数据提交至服务端的条件是 withMutationBufferSpace 或 withMutationBufferLinesNum 中任意一个优先达到。 |
| 重试机制相关参数 | withRequestFailedRetryEnable | FALSE | 是否开启 mutate 失败重试机制。取值为 TRUE 或 FALSE。 |
| withRequestFailedRetryTimes | 5 | mutate 失败,重试最大次数。 | |
| 重试机制相关参数 | withRequestFailedRetryInternalMs | 5000 (5秒) | 失败重试间隔时间,单位毫秒(ms)。 |
| withRequestFailedRetryLogDebugEnable | FALSE | 是否开启debug日志 | |
| withRequestFailedRetryStatus | RegisterStatus.RetryStatus.THROTTLED | 根据哪种错误原因进行重试,默认是RegisterStatus.RetryStatus.THROTTLED。多个值使用逗号隔开 取值 RegisterStatus.RetryStatus.THROTTLED RegisterStatus.RetryStatus.INTERNAL_ERROR RegisterStatus.RetryStatus.FAILED RegisterStatus.RetryStatus.PRECHECK_FAILED |
写入数据(Row)
通过 stream.createRow 方法创建具体的数据对象(Row),并通过 row.setValue 方法将数据封装到 Row 对象中。
Row row = stream.createRow(Stream.Operator.INSERT);
row.setValue("id", t);
row.setValue("name", String.valueOf(t));
stream.apply(row);
-
当 Stream 创建为
RowStream.RealTimeOperate.APPEND_ONLY时,仅能创建Stream.Operator.INSERT类型的 Row。 -
当 Stream 创建为
RowStream.RealTimeOperate.CDC时,以下所有 Row 类型均可使用:Stream.Operator.INSERT:插入行,如果目标行已存在则报错。Stream.Operator.DELETE:删除行,如果目标行不存在则报错。Stream.Operator.UPDATE:更新行,如果目标行不存在则报错。Stream.Operator.UPSERT:插入行,如果目标行已存在则更新该行。Stream.Operator.INSERT_IGNORE:插入行,如果目标行已存在则自动忽略。
数据提交到服务端
通过调用 ((RealtimeStream)stream).flush() 方法,数据会提交到服务端。如果没有调用该方法,则数据根据 withMutationBufferSpace、withMutationBufferLinesNum 或 withFlushInterval 中任意一个条件优先达到时,数据将被发送到服务端。
具体案例
普通表追加写入
写入vector类型
创建表
使用Java写入
CDC实时写入
Lakehouse支持数据库的CDC(Change Data Capture)功能,以流的方式将数据写入到Lakehouse表中,并实时更新表数据。同步过程通过RealtimeStream实时更新插入和删除行操作来实现。同时,支持使用Flink connector和IGS SDK进行数据写入。在创建表时,需要设置主键以确保数据的唯一性和一致性。
创建表
在创建Lakehouse表时,需要指定主键。CDC写入会根据主键进行数据去重,以确保数据的准确性。虽然创建的主键表支持SQL操作,建议通过实时写入流进行数据写入。以下是一个创建表的示例:
IGS SDK实时写入流
创建实时写入流
使用IGS SDK创建实时写入流,需要指定操作类型(CDC)和相关选项。以下是一个创建实时写入流的示例:
指定操作类型
根据需求,可以指定不同的操作类型:
Stream.Operator.UPSERT:插入或更新行。如果目标行不存在,则插入;如果已存在,则更新。Stream.Operator.DELETE_IGNORE:删除行。如果目标行不存在,则自动忽略。
使用原生Java Sdk写入
使用Lakehouse实时同步功能写入
参考文档多表实时同步
使用FLINK CONNECTOR写入
Flink connector是基于RealtimeStream SDK封装的,用于实现实时数据同步。查看Flink Connector
