Table Stream 变化数据捕获

Table Stream 是云器 Lakehouse 的变更数据捕获(CDC)机制,用于捕获表对象的变化数据。通过定义 Table Stream 对象,你可以基于现有表来记录和追踪数据的变化。

快速定位:这是你需要的吗?

如果你熟悉 Kafka,可以把 Table Stream 理解为"表上的消费位点"——它记录你上次读到了哪个版本,下次查询时只返回那之后的变化行。与 Kafka 消费组不同的是,位点只在你用 DML 语句消费后才前移,不会因为普通查询而丢失数据。

快速选型

场景推荐方案
捕获表的行级变化(INSERT/UPDATE/DELETE)Table Stream(本文)
持续从 Kafka 或 OSS 导入数据到表Pipe
自动维护聚合或转换结果的衍生表动态表(Dynamic Table)

什么是 Table Stream

Table Stream 利用 Lakehouse Table 的多历史版本功能,在创建时记录源表的指定版本(或最新版本)作为初始读取位置。当你查询 Table Stream 时,它将返回从初始位点到当前最新版本的所有变化记录。

核心特点

  • Table Stream 不存储实际数据,仅记录和维护源表的数据版本位点
  • 仅当使用 DML 语句(INSERT/DELETE/UPDATE/MERGE)消费 Table Stream 时,其位点才会更新至最新的数据版本
  • 基于 MVCC 机制,无需额外的日志存储或复杂的 CDC 配置

源表 (Base Table) Table Stream +------------------+ +------------------+ | 版本 1: [A, B] | | 初始位点: 版本 1 | | 版本 2: [A,B,C] | <-- DML --> | 可见变化: [C] | | 版本 3: [A,X,C] | <-- DML --> | 可见变化: [B→X] | | 版本 4: [A,X] | <-- DML --> | 可见变化: [C 删除] | +------------------+ +------------------+

工作原理

1. 创建 Table Stream └─ 记录源表当前版本作为初始位点 2. 源表发生 DML 变更 └─ MetaService 记录新版本 └─ Table Stream 自动感知变化 3. 查询 Table Stream └─ 返回从初始位点到最新版本的所有变化记录 4. 消费 Table Stream(DML 操作) └─ INSERT INTO target SELECT * FROM stream └─ 位点自动更新到最新版本

Table Stream 类型

类型跟踪范围适用场景
STANDARD所有 DML 变化(INSERT、UPDATE、DELETE、TRUNCATE)需要完整捕获数据变更的 ETL 场景
APPEND_ONLY仅 INSERT 操作日志类、事件类仅追加的数据场景

STANDARD 类型详解

STANDARD 类型提供行级别的变化,通过连接加工所有变化的 delta 数据来提供行级别增量。delta 变化反映的是源对象的最新状态,而不是历史变化

例如:

  • 如果在 Table Stream 的 offset 之后,有一行被插入,然后被更新,那么 delta 变化就是更新后的那一行
  • 如果在 Table Stream 的 offset 之后,有一行被插入,然后被删除,那么 delta 变化就是没有这一行

APPEND_ONLY 类型详解

APPEND_ONLY 类型仅记录 INSERT 操作的数据,update 和 delete 操作不会记录。

例如:最初在表中插入了 10 行,然后在点位没有移动的时候执行 delete 操作删除 5 行,Table Stream 仍然记录 10 行操作。

如何判断变更类型

STANDARD 模式下,

__change_type
__change_type
字段有四种取值,
SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
参数无关

__change_type
__change_type
含义说明
INSERT
INSERT
新增行源表执行了 INSERT
UPDATE_BEFORE
UPDATE_BEFORE
更新前的旧值
UPDATE_AFTER
UPDATE_AFTER
成对出现,
__commit_version
__commit_version
为旧版本号
UPDATE_AFTER
UPDATE_AFTER
更新后的新值
UPDATE_BEFORE
UPDATE_BEFORE
成对出现,
__commit_version
__commit_version
为新版本号
DELETE
DELETE
被删除的行源表执行了 DELETE,保留被删行的字段值

UPDATE 操作产生两行记录:

UPDATE_BEFORE
UPDATE_BEFORE
(旧值)和
UPDATE_AFTER
UPDATE_AFTER
(新值),两行
id
id
相同但
__commit_version
__commit_version
不同。

STANDARD 模式示例

-- 源表执行 UPDATE: UPDATE source SET val = 999 WHERE id = 1 SELECT __change_type, __commit_version, id, val FROM my_stream ORDER BY id, __change_type; +-----------------+------------------+----+-----+ | __change_type | __commit_version | id | val | +-----------------+------------------+----+-----+ | UPDATE_AFTER | 4 | 1 | 999 | <-- 新值 | UPDATE_BEFORE | 2 | 1 | 100 | <-- 旧值 +-----------------+------------------+----+-----+ -- 源表执行 DELETE: DELETE FROM source WHERE id = 2 SELECT __change_type, id, val FROM my_stream; +-----------------+----+-----+ | __change_type | id | val | +-----------------+----+-----+ | DELETE | 2 | 200 | <-- 被删除的行,保留字段值 +-----------------+----+-----+

APPEND_ONLY 模式下,

__change_type
__change_type
始终为
INSERT
INSERT
,UPDATE 和 DELETE 操作不产生任何记录。

SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
参数的实际作用

SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
控制的是建 Stream 时表中已有数据是否可见,不影响
__change_type
__change_type
的取值:

SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
行为
'FALSE'
'FALSE'
(默认)
建 Stream 时表中已有的数据不可见,只捕获建 Stream 之后发生的变更
'TRUE'
'TRUE'
建 Stream 时表中已有的数据以
INSERT
INSERT
形式暴露在首次消费中,消费完初始快照后,后续变更正常产生
UPDATE_BEFORE
UPDATE_BEFORE
/
UPDATE_AFTER
UPDATE_AFTER
/
DELETE
DELETE

在 MERGE 语句中消费 STANDARD 模式 Stream 的标准写法

MERGE INTO target t USING source_stream s ON t.id = s.id WHEN MATCHED AND s.__change_type = 'UPDATE_AFTER' THEN UPDATE SET t.name = s.name, t.val = s.val WHEN MATCHED AND s.__change_type = 'DELETE' THEN DELETE WHEN NOT MATCHED AND s.__change_type = 'INSERT' THEN INSERT (id, name, val) VALUES (s.id, s.name, s.val); -- UPDATE_BEFORE 行不需要处理,MERGE 会自动忽略未匹配的条件

使用 Table Stream

前置条件

Table Stream 可以直接在任何普通表上创建,无需额外配置。

创建 Table Stream

Table Stream 可以直接在任何普通表上创建:

-- 创建测试表 CREATE TABLE data_change_test (id INT, name STRING); -- 插入初始数据 INSERT INTO data_change_test VALUES (1, 'apple'); -- 创建 APPEND_ONLY 类型的 Table Stream CREATE TABLE STREAM data_change_test_stream ON TABLE data_change_test WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');

查询 Table Stream

-- 插入新数据 INSERT INTO data_change_test VALUES (2, 'banana'); -- 查询源表(2 条记录) SELECT * FROM data_change_test; +----+--------+ | id | name | +----+--------+ | 1 | apple | | 2 | banana | +----+--------+ -- 查询 Table Stream(仅 1 条新增记录) SELECT * FROM data_change_test_stream; +----+--------+ | id | name | +----+--------+ | 2 | banana | +----+--------+

消费 Table Stream

使用 DML 语句消费 Table Stream 时,位点会自动更新:

-- 创建目标表 CREATE TABLE data_change_test_offset (id INT, name STRING); -- 消费 Stream 数据(位点自动更新) INSERT INTO data_change_test_offset SELECT id, name FROM data_change_test_stream; -- 再次查询 Stream(数据已被消费,返回空) SELECT * FROM data_change_test_stream; +----+------+ | id | name | +----+------+ +----+------+

删除 Table Stream

DROP TABLE STREAM IF EXISTS data_change_test_stream;

数据变化可见性时效

数据写入方式Table Stream 可见时效
DML(INSERT/UPDATE/DELETE)任务成功结束后立即可见
批量导入(Bulkload)任务成功结束后立即可见
流式导入(Ingestion Service)默认 1 分钟提交变化后可见

Table Stream 与动态表的关系

维度Table Stream动态表 (Dynamic Table)
定位底层 CDC 机制,捕获表的变化数据高级数据加工特性,基于增量计算实现数据转换
是否存储数据否,仅记录位点是,存储计算结果
是否执行计算否,仅返回变化记录是,执行定义的 SQL 逻辑
刷新方式查询时返回变化,消费时更新位点按配置的间隔自动刷新
典型场景变化数据捕获、实时数据同步数据仓库分层加工、指标汇总

协同工作模式

在实际业务中,Table Stream 与动态表经常协同工作:

源表 --[Table Stream]--> 捕获变化 --[Dynamic Table]--> 转换聚合 --> 目标表

  • ETL 数据处理链:先在源表创建 Table Stream 捕获变化数据,随后利用动态表对这些变化数据执行转换与聚合操作
  • 多级增量计算:搭建一系列动态表,让每个动态表依次处理前一个表产生的增量数据
  • 实时数据分析:借助 Table Stream 实时捕获业务系统数据变动,再通过动态表开展实时分析与计算

最佳实践

1. 直接创建

Table Stream 可直接在任何普通表上创建,无需额外配置:

-- 直接创建 Table Stream CREATE TABLE STREAM my_stream ON TABLE source_table WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

2. 选择合适的类型

  • 如果只需要捕获新增数据(如日志表、事件表),使用 APPEND_ONLY 类型,性能更优
  • 如果需要完整的变更数据(包括更新和删除),使用 STANDARD 类型

3. 及时消费

Table Stream 的变化数据依赖于源表的 Time Travel 保留周期。如果长时间不消费,源表的历史版本可能被清理,导致 Table Stream 无法获取完整的变化数据。

4. 监控位点状态

通过

DESC TABLE STREAM
DESC TABLE STREAM
查看 Table Stream 的当前位点状态:

DESC TABLE STREAM my_stream; +------------------------+---------------------------+ | info_name | info_value | +------------------------+---------------------------+ | name | my_stream | | creator | qiliang | | created_time | 2026-05-19 22:29:26.01 | | last_modified_time | 2026-05-19 22:29:26.017 | | workspace | quick_start | | base_tables | quick_start.source_table | | stale | false | | offset | -1 | | current_offset_time | 2026-05-19 22:28:47.669 | | current_offset_version | 1 | +------------------------+---------------------------+

  • stale = true
    stale = true
    表示源表的历史版本已被清理,Table Stream 可能无法获取完整变化
  • offset
    offset
    为 -1 表示尚未消费任何数据

注意事项

  • Table Stream 可直接在任何普通表上创建,无需额外配置
  • 删除源表时,关联的 Table Stream 将失效
  • Table Stream 的位点更新仅在 DML 消费时触发,仅查询不会更新位点
  • STANDARD 模式下 UPDATE 产生
    UPDATE_BEFORE
    UPDATE_BEFORE
    +
    UPDATE_AFTER
    UPDATE_AFTER
    两行,DELETE 产生
    DELETE
    DELETE
    记录,与
    SHOW_INITIAL_ROWS
    SHOW_INITIAL_ROWS
    参数无关
  • CREATE OR REPLACE TABLE
    CREATE OR REPLACE TABLE
    会让关联 Stream 变 stale
    :对源表执行
    CREATE OR REPLACE TABLE
    CREATE OR REPLACE TABLE
    (重建表)会清除表的历史版本,导致基于该表的所有 Table Stream 立即变为 stale 状态(
    stale = true
    stale = true
    ),无法再获取完整变化数据。如需修改表结构,优先使用
    ALTER TABLE
    ALTER TABLE
    ,而不是
    CREATE OR REPLACE TABLE
    CREATE OR REPLACE TABLE

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询