CREATE TABLE STREAM
创建 Table Stream,用于捕获表的增量变更(INSERT / UPDATE / DELETE)。可以把 Stream 理解为"表上的消费游标"——它记录你上次读到哪个版本,下次查询时只返回那之后的变化行。
选型定位:捕获表的行级变更,驱动增量 ETL 或跨表同步;如果只需要自动维护加工结果,用 Dynamic Table;如果只需要持续从外部导入数据,用 Pipe。
Table Stream 语法
支持在 Table、Dynamic Table、Materialized View、External Table(Kafka)上创建 Table Stream;也支持在 External Volume 上创建 Volume Stream,用于监听对象存储中的文件变更事件。
Table、Dynamic Table、Materialized View 创建 Table Stream 语法
-
:Table Stream的名称。<name> -
:要获取增量的基表。不支持view。<table_name> -
:(可选)指定Table Stream应从底层表开始接收更新的时间戳表达式。如果省略此参数,Table Stream将从当前时间开始接收更新。TIMESTAMP AS OF timestamp_expression
返回结果是一个标准的时间戳类型的表达式,TIMESTAMP AS OF指定的最早时间戳取决TIME TRAVEL(data_retention_days)参数,如果指定的版本不存在则会报错。如果未指定则使用当前时间戳的版本数据,例如:timestamp_expression
,即可以强制转换为时间戳的字符串。'2023-11-07 14:49:18'
。cast('2023-11-07 14:49:18 Asia/Shanghai' as timestamp)
。current_timestamp() - interval '12' hours- 本身就是时间戳或可强制转换为时间戳的任何其他表达式。
-
:(可选)Table Stream的注释。COMMENT -
:(必选)值二选一,APPEND_ONLY和STANDARD。'TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD'- APPEND_ONLY只记录对象的INSERT操作的数据。Update和delete操作不会记录。例如,最初在表中插入了10行,然后在点位没有移动的时候执行delete操作删除5行,Table Stream仍然记录10行操作。
- STANDARD模式:STANDARD模式下可以跟踪源对象的所有DML变化,包括插入、更新和删除(包括表截断)。这种提供行级别的变化是通过将所有变化的delta数据进行连接加工来提供行级别增量。Table Stream中的delta变化指的是在两个事务时间点之间发生的数据变化。例如,如果在Table Stream的offset之后,有一个行被插入,然后被更新,那么delta变化就是一个新的行。如果在stream的offset之后,有一个行被插入,然后被删除,那么delta变化就是没有这个行。换句话说,delta变化会反映源对象的最新状态,而不是历史变化。
-
SHOW_INITIAL_ROWS:可选参数,控制建 Stream 时表中已有数据的可见性:
(默认):建 Stream 时表中已有的数据不可见,只捕获建 Stream 之后发生的变更。'FALSE'
:建 Stream 时表中已有的数据以'TRUE'
形式暴露在首次消费中,消费完初始快照后,后续变更正常产生INSERT
/UPDATE_BEFORE
/UPDATE_AFTER
。DELETE
注意:
只影响建 Stream 时历史数据是否可见,不影响SHOW_INITIAL_ROWS
的取值。无论该参数为何值,STANDARD 模式下 UPDATE 均产生__change_type
+UPDATE_BEFORE
两行,DELETE 均产生UPDATE_AFTER
记录。DELETE
注意事项
指定的最早时间戳取决于 TIME TRAVEL(TIMESTAMP AS OF
)参数,如果指定的版本不存在则会报错。Lakehouse 默认保留数据 1 天,可通过data_retention_days
延长。ALTER TABLE ... SET PROPERTIES ('data_retention_days'='7')- 流式写入(Ingestion Service)的数据需等约 1 分钟提交后才在 Stream 中可见(见上方警告);SQL DML 写入的数据任务成功后立即可见。
使用案例
案例1:创建 APPEND_ONLY 模式的 Table Stream
Stream 创建前已有的
(1, 'apple') 不出现;__change_type 固定为 INSERT,UPDATE/DELETE 不记录。
案例2:创建 STANDARD 模式的 Table Stream
STANDARD 模式反映的是净变化(net change):
:被删除 → 一条id=1
行,值为删除前的数据DELETE
:先 INSERT 后 UPDATE → 合并为一条id=2
行,值为最终状态INSERT
(中间的orange
状态不单独出现)banana
如果需要看到
UPDATE_BEFORE / UPDATE_AFTER,需要在 Stream 创建后先消费一次(推进 offset),再执行 UPDATE 操作。
Kafka Table Stream
支持在Kafka外部表上创建表流(Table Stream),实现实时消费Topic数据.
语法
参数说明
:指定消费起始时间点(可选),支持格式:TIMESTAMP AS OF- 明确时间戳:
'2023-01-01 12:00:00' - 时间函数:
CURRENT_TIMESTAMP() - INTERVAL '1' HOUR
- 明确时间戳:
:控制初始数据加载行为:show_initial_rows
:加载从外部表创建时指定的点位到Table Stream指定的点位之间的历史数据true
:从最新点位(false
)开始消费,不加载历史数据(默认值)latest
: 固定为table_stream_mode
,仅处理Kafka新增数据(不支持更新/删除操作)append_only
示例
- Kafka 外部表创建 首先,你需要创建一个 Kafka 外部表。这是创建 Table Stream 的基础。以下是创建 Kafka 外部表的语法:
创建Kafka Connection
创建Kafka外部表
是外部表的名称。external_table_kafka
和key_column
分别代表 Kafka 消息的键和值,其中value_column
是必需的。value_column
指定了使用 Kafka 作为数据源。USING kafka
部分包含了 Kafka 消费者配置,如消费者组 ID (OPTIONS
) 和要订阅的主题 (group_id
)。topics
指定了与 Kafka 的连接配置,这通常包括 Kafka 集群的地址和其他连接参数。CONNECTION pipe_kafka
- 创建 Table Stream
在 Kafka 外部表的基础上,你可以创建一个 Table Stream,用于实时处理 Kafka 数据流。以下是创建 Table Stream 的语法:
Volume Stream
CREATE VOLUME STREAM 语法
用于在 External Volume 的 Directory Table 上创建 Volume Stream,以监听外部对象存储中文件的新增与删除事件。
参数说明
<name>:Volume Stream 的名称。
<volume_name>:要监听的 External Volume 名称。该 Volume 必须满足以下条件:
- 已开启目录功能,即建表时指定 DIRECTORY = (ENABLE = TRUE);
- 已开启递归监听,即建表时指定 RECURSIVE = TRUE。
示例
注意事项
- 创建 Volume Stream 之前,须开启 DIRECTORY = (ENABLE = TRUE) 和 RECURSIVE = TRUE 。
- Volume Stream 本质上是构建在 Directory Table 之上的 Table Stream,消费机制与 Table Stream 完全一致:仅 DML 语句(如 INSERT INTO ... SELECT FROM stream)会推进消费点位,纯 SELECT 查询不会推进。
- 由于对象存储事件通过消息队列传递存在约 1 分钟的延迟,文件上传后需等待约 1 分钟才能在 Volume Stream 中查询到对应的变更记录。
相关指南
- Table Stream 变更数据捕获:Stream 的创建、消费、偏移量推进机制及完整 CDC 示例
