Table Stream功能
Table Stream是一种实时数据流,用于记录对表所做的数据操作语言(DML)更改,包括插入、更新和删除操作。同时,Table Stream还提供有关每次更改的元数据,以便您可以根据更改的数据采取相应操作。支持在Table、Dynamic Table、Materialized View、External Table(Kafka)创建Table Stream
Table Stream语法
Table、Dynamic Table、Materialized View创建Table Stream语法
-
<name>
:Table Stream的名称。 -
<table_name>
:要获取增量的基表。不支持view。 -
TIMESTAMP AS OF timestamp_expression
:(可选)指定Table Stream应从底层表开始接收更新的时间戳表达式。如果省略此参数,Table Stream将从当前时间开始接收更新。timestamp_expression
返回结果是一个标准的时间戳类型的表达式,TIMESTAMP AS OF指定的最早时间戳取决TIME TRAVEL(data_retention_days)参数,如果指定的版本不存在则会报错。如果未指定则使用当前时间戳的版本数据,例如:'2023-11-07 14:49:18'
,即可以强制转换为时间戳的字符串。cast('2023-11-07 14:49:18 Asia/Shanghai' as timestamp)
。current_timestamp() - interval '12' hours
。- 本身就是时间戳或可强制转换为时间戳的任何其他表达式。
-
COMMENT
:(可选)Table Stream的注释。 -
'TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD'
:(必选)值二选一,APPEND_ONLY和STANDARD。- APPEND_ONLY只记录对象的INSERT操作的数据。Update和delete操作不会记录。例如,最初在表中插入了10行,然后在点位没有移动的时候执行delete操作删除5行,Table Stream仍然记录10行操作。
- STANDARD模式:STANDARD模式下可以跟踪源对象的所有DML变化,包括插入、更新和删除(包括表截断)。这种提供行级别的变化是通过将所有变化的delta数据进行连接加工来提供行级别增量。Table Stream中的delta变化指的是在两个事务时间点之间发生的数据变化。例如,如果在Table Stream的offset之后,有一个行被插入,然后被更新,那么delta变化就是一个新的行。如果在stream的offset之后,有一个行被插入,然后被删除,那么delta变化就是没有这个行。换句话说,delta变化会反映源对象的最新状态,而不是历史变化。
-
SHOW_INITIAL_ROWS:可选参数,指定为TRUE时,创建stream时候记录下table当时的版本,第一次消费该stream,永远拿的是创建时的那个版本,同时第一次消费时,所有数据都是insert模式,第一次消费结束后,后续的行为就和之前行为一样,从某个版本到下次版本之间的Delta数据。
注意事项
- 在创建Table Stream之前,必须在基表上执行以下操作:
- TIMESTAMP AS OF指定的最早时间戳取决TIME TRAVEL(data_retention_days)参数,如果指定的版本不存在则会报错。此参数定义了在被删除数据被保留的时间长度,Lakehouse默认保留数据一天。根据您的业务需求,您可以通过调整
data_retention_days
参数来延长或缩短数据的保留周期。请注意,调整数据保留周期可能会影响存储成本。延长保留周期会增加存储需求,从而可能增加相关的费用。 - 通过实时上传数据写入的数据一分钟之后才可以读取,Table Stream 只能读取已经提交的数据。实时任务写入的数据需要等待 1 分钟才能确认,所以 Table Stream 也要等 1 分钟才能看到。
使用案例
案例1:创建APPEND_ONLY模式的Table Stream
案例2:创建STANDARD模式的Table Stream
Kafka Table Stream
支持在Kafka外部表上创建表流(Table Stream),实现实时消费Topic数据.
语法
参数说明
TIMESTAMP AS OF
:指定消费起始时间点(可选),支持格式:- 明确时间戳:
'2023-01-01 12:00:00'
- 时间函数:
CURRENT_TIMESTAMP() - INTERVAL '1' HOUR
- 明确时间戳:
show_initial_rows
:控制初始数据加载行为:true
:加载从外部表创建时指定的点位到Table Stream指定的点位之间的历史数据false
:从最新点位(latest
)开始消费,不加载历史数据(默认值)
table_stream_mode
: 固定为append_only
,仅处理Kafka新增数据(不支持更新/删除操作)
示例
- Kafka 外部表创建 首先,您需要创建一个 Kafka 外部表。这是创建 Table Stream 的基础。以下是创建 Kafka 外部表的语法:
创建Kafka Connection
创建Kafka外部表
external_table_kafka
是外部表的名称。key_column
和value_column
分别代表 Kafka 消息的键和值,其中value_column
是必需的。USING kafka
指定了使用 Kafka 作为数据源。OPTIONS
部分包含了 Kafka 消费者配置,如消费者组 ID (group_id
) 和要订阅的主题 (topics
)。CONNECTION pipe_kafka
指定了与 Kafka 的连接配置,这通常包括 Kafka 集群的地址和其他连接参数。
- 创建 Table Stream
在 Kafka 外部表的基础上,您可以创建一个 Table Stream,用于实时处理 Kafka 数据流。以下是创建 Table Stream 的语法: