云器 Lakehouse Table Stream 最佳实践指南

Table Stream 在企业数据组织中的角色

在现代数据驱动型企业中,数据变更的实时捕获和处理已成为关键能力。企业数据组织通常面临以下挑战:

  • 跨系统数据同步延迟导致的决策滞后
  • 复杂 ETL 流程中的增量更新困难
  • 数据变更历史追踪和审计的复杂性
  • 实时数据集成和事件驱动架构的实现难度

云器 Lakehouse 的 Table Stream 功能正是为解决这些挑战而设计的核心组件。它在企业数据组织中扮演着关键角色:

  1. 数据集成中枢:作为变更数据捕获(CDC)的核心机制,促进不同系统间的实时数据流动
  2. 数据质量保障:提供数据变更的可追溯性,支持数据血缘和影响分析
  3. 实时分析基础:为实时数据仓库、即时报表和仪表盘提供数据变更流
  4. 事件驱动触发器:作为事件源,驱动下游业务流程和自动化操作
  5. 数据治理支柱:支持合规要求,记录敏感数据的变更历史

在数据架构中,Table Stream 连接了 OLTP 系统和分析系统,使企业能够建立“流批一体”的现代数据平台,提升数据时效性和业务响应速度。

简介

1 什么是Table Stream

Table Stream 是云器 Lakehouse 架构中的核心功能,提供变更数据捕获 (CDC) 能力,用于记录表中数据的插入、更新和删除操作。它创建了一个“变更表”,使用户能够查询和消费两个事务时间点之间的行级变更记录。

2 核心功能

  • 变更捕获:记录表级 DML 操作 (INSERT、UPDATE、DELETE)
  • 元数据记录:提供每次变更的版本、时间戳等元数据
  • 增量处理:支持增量读取和处理数据变更
  • 消费机制:支持通过 DML 操作消费变更数据并移动 offset

3 适用场景

  • 数据同步和复制
  • 实时数据集成
  • 增量 ETL/ELT 流程
  • 审计和数据治理
  • 事件驱动架构

准备工作

1 表配置要求

在使用Table Stream之前,必须确保源表已正确配置:

-- 创建源表示例 CREATE TABLE source_table ( id INT, name STRING, value DOUBLE, updated_at TIMESTAMP );

2 创建 Table Stream

Table Stream 可以直接在任何普通表上创建,无需额外配置:

-- 创建源表示例 CREATE TABLE source_table ( id INT, name STRING, value DOUBLE, updated_at TIMESTAMP ); -- 直接创建 Table Stream CREATE TABLE STREAM source_stream ON TABLE source_table WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

3 准备目标表

如果计划将Stream数据写入目标表,提前创建具有兼容结构的目标表:

-- 创建目标表 CREATE TABLE target_table ( id INT, name STRING, value DOUBLE, updated_at TIMESTAMP );

创建和配置

1 基本语法

创建Table Stream的基本语法:

CREATE TABLE STREAM stream_name ON TABLE source_table [COMMENT 'stream description'] WITH PROPERTIES ( 'TABLE_STREAM_MODE' = 'STANDARD|APPEND_ONLY', ['SHOW_INITIAL_ROWS' = 'TRUE|FALSE'] );

2 重要参数

2.1 TABLE_STREAM_MODE

  • STANDARD:捕获所有DML操作(INSERT、UPDATE、DELETE),反映表的当前状态
  • APPEND_ONLY:只捕获INSERT操作,即使行被更新或删除也保留原始INSERT记录

2.2 SHOW_INITIAL_ROWS

  • TRUE:首次消费时返回创建Stream时表中的所有现有行
  • FALSE(默认):首次消费只返回创建Stream后的新变更

3 时间点设置

可以指定Stream开始捕获变更的时间点:

CREATE TABLE STREAM stream_name ON TABLE source_table TIMESTAMP AS OF current_timestamp() WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

最佳实践:使用

current_timestamp()
current_timestamp()
或具体的时间戳字符串,避免使用复杂的时间表达式。

4 注释添加

为Stream添加描述性注释:

CREATE TABLE STREAM stream_name ON TABLE source_table COMMENT '捕获source_table的数据变更' WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

注意:使用正确的语法

COMMENT '注释内容'
COMMENT '注释内容'
,而非
COMMENT = '注释内容'
COMMENT = '注释内容'

5 多消费者模式

一个 Stream 只能被一个消费者完整消费。当任务 A 通过 DML 消费了 Stream,偏移量就推进了,任务 B 再查同一个 Stream 时,那批变更数据已经不在了。

如果多个下游任务(或不同的下游系统)都需要消费同一张表的变更,为每个消费者单独创建一个 Stream

-- 开启源表变更跟踪 ALTER TABLE orders SET PROPERTIES ('change_tracking' = 'true'); -- 为数据仓库同步任务创建 Stream CREATE TABLE STREAM orders_stream_for_dw ON TABLE orders WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD'); -- 为实时通知任务创建 Stream(只关心新增订单) CREATE TABLE STREAM orders_stream_for_notify ON TABLE orders WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY'); -- 为审计任务创建 Stream CREATE TABLE STREAM orders_stream_for_audit ON TABLE orders WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

每个 Stream 独立维护自己的偏移量,A 消费不影响 B 和 C。Stream 本身只存储偏移量,不复制表数据,创建多个 Stream 的额外存储成本极低。

使用不同模式

1 STANDARD模式

推荐用途:需要表的完整当前状态,包括更新和删除操作。

CREATE TABLE STREAM standard_stream ON TABLE source_table WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');

特点:

  • 准确反映表的当前状态
  • 更新会显示最新值
  • 已删除的行不会出现在结果中

2 APPEND_ONLY模式

推荐用途:需要保留所有插入记录,包括后续被更新或删除的记录。

CREATE TABLE STREAM append_stream ON TABLE source_table WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');

特点:

  • 记录所有INSERT操作
  • 不反映UPDATE和DELETE操作
  • 即使行被删除,原始INSERT记录仍会保留

3 模式选择指南

需求推荐模式
数据同步(保持目标与源一致)STANDARD
审计所有插入记录APPEND_ONLY
增量ETL流程STANDARD
历史记录保留APPEND_ONLY

消费和处理数据

1 偏移量推进规则

理解偏移量(offset)何时推进,是正确使用 Table Stream 的基础。

核心规则:只有包含该 Stream 的 DML 事务成功提交后,偏移量才推进。

操作偏移量是否推进
SELECT * FROM stream
SELECT * FROM stream
不推进
INSERT INTO t SELECT ... FROM stream
INSERT INTO t SELECT ... FROM stream
(成功提交)
推进
INSERT INTO t SELECT ... FROM stream WHERE ...
INSERT INTO t SELECT ... FROM stream WHERE ...
(成功提交)
推进(即使 WHERE 过滤了部分数据)
包含 Stream 的事务回滚不推进
包含 Stream 的事务失败不推进

WHERE 条件不影响偏移量推进:即使 WHERE 子句过滤掉了大部分数据,只要 DML 事务成功提交,Stream 的全部数据都会被消费,偏移量推进到当前位置。

-- 示例:只处理 value > 100 的变更 -- 但 Stream 中所有变更(包括 value <= 100 的)都会被消费 INSERT INTO high_value_target SELECT id, name, value FROM my_stream WHERE value > 100; -- 再次查询 Stream,value <= 100 的变更也不见了 SELECT COUNT(*) FROM my_stream; -- 返回 0(或只有新产生的变更)

如果你只想处理部分数据且不丢失其余数据,应该先全量消费到中间表,再从中间表筛选:

-- 先全量消费到中间表 INSERT INTO staging_table SELECT * FROM my_stream; -- 再从中间表按条件处理 INSERT INTO high_value_target SELECT id, name, value FROM staging_table WHERE value > 100;

2 查询Stream数据

-- 查询Stream中的变更数据(不推进偏移量) SELECT * FROM my_stream;

重要:仅使用 SELECT 查询不会移动 Stream 的 offset,可以反复查看同一批变更数据。

3 消费和移动Offset

要移动 Stream 的 offset(消费数据),必须使用 DML 操作:

-- 将Stream数据插入目标表(会移动offset) INSERT INTO target_table SELECT id, name, value, updated_at FROM my_stream;

4 消费模式

4.1 全量消费

-- 消费Stream中的所有变更数据 INSERT INTO target_table SELECT id, name, value, updated_at FROM my_stream;

4.2 条件消费(注意数据丢失风险)

-- 仅消费特定条件的变更数据 -- 警告:Stream 中所有数据的 offset 都会推进,value <= 100 的变更会被丢弃 INSERT INTO target_table SELECT id, name, value, updated_at FROM my_stream WHERE value > 100;

注意:即使使用 WHERE 条件,所有 Stream 数据的 offset 仍会移动。如果需要保留被过滤的数据,先全量消费到中间表。

5 验证消费状态

通过再次查询Stream,验证数据是否被消费:

-- 验证消费后的Stream状态 SELECT COUNT(*) FROM my_stream;

如果消费成功,COUNT 应该为 0 或只包含新的变更数据。

元数据字段使用

1 可用元数据字段

Table Stream返回的结果包含以下元数据字段:

  • __change_type
    __change_type
    :变更类型
  • __commit_version
    __commit_version
    :提交版本
  • __commit_timestamp
    __commit_timestamp
    :提交时间戳

2 变更类型说明

STANDARD 模式下,

__change_type
__change_type
字段会出现以下四种值:

__change_type
__change_type
含义说明
INSERT
INSERT
新增行源表执行了 INSERT
UPDATE_BEFORE
UPDATE_BEFORE
更新前的旧值
UPDATE_AFTER
UPDATE_AFTER
成对出现,
__commit_version
__commit_version
为旧版本号
UPDATE_AFTER
UPDATE_AFTER
更新后的新值
UPDATE_BEFORE
UPDATE_BEFORE
成对出现,
__commit_version
__commit_version
为新版本号
DELETE
DELETE
被删除的行源表执行了 DELETE,保留被删行的字段值

UPDATE 操作产生两行记录:

UPDATE_BEFORE
UPDATE_BEFORE
(旧值)和
UPDATE_AFTER
UPDATE_AFTER
(新值),两行
id
id
相同但
__commit_version
__commit_version
不同。这与
SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
参数无关
,两种设置下行为一致。

SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
控制的是建 Stream 时表里已有数据是否可见,不影响
__change_type
__change_type
的取值:

  • FALSE
    FALSE
    (默认):建 Stream 时表已有的数据不可见,只捕获建 Stream 之后发生的变更
  • TRUE
    TRUE
    :建 Stream 时表已有的数据以
    INSERT
    INSERT
    形式暴露,消费完初始快照后,后续变更正常产生
    UPDATE_BEFORE
    UPDATE_BEFORE
    /
    UPDATE_AFTER
    UPDATE_AFTER
    /
    DELETE
    DELETE

在 MERGE 语句中使用 Stream 的标准写法

MERGE INTO target t USING source_stream s ON t.id = s.id WHEN MATCHED AND s.__change_type = 'UPDATE_AFTER' THEN UPDATE SET t.name = s.name, t.value = s.value WHEN MATCHED AND s.__change_type = 'DELETE' THEN DELETE WHEN NOT MATCHED AND s.__change_type = 'INSERT' THEN INSERT (id, name, value) VALUES (s.id, s.name, s.value); -- UPDATE_BEFORE 行不需要处理,MERGE 会自动忽略未匹配的条件

APPEND_ONLY 模式下,

__change_type
__change_type
始终为
INSERT
INSERT
,UPDATE 和 DELETE 操作不产生任何记录。

3 使用元数据实现增量处理

-- 基于提交版本筛选 SELECT * FROM my_stream WHERE __commit_version > last_processed_version; -- 基于提交时间筛选 SELECT * FROM my_stream WHERE __commit_timestamp > TIMESTAMP '2025-05-01 00:00:00';

4 元数据字段最佳实践

  • __change_type
    __change_type
    SHOW_INITIAL_ROWS
    SHOW_INITIAL_ROWS
    无关
    :无论默认还是
    TRUE
    TRUE
    模式,STANDARD 模式下 UPDATE 都产生
    UPDATE_BEFORE
    UPDATE_BEFORE
    /
    UPDATE_AFTER
    UPDATE_AFTER
    ,DELETE 产生
    DELETE
    DELETE
  • MERGE 统一使用完整写法:按
    __change_type
    __change_type
    区分
    UPDATE_AFTER
    UPDATE_AFTER
    /
    DELETE
    DELETE
    /
    INSERT
    INSERT
    UPDATE_BEFORE
    UPDATE_BEFORE
    行忽略即可
  • 使用
    __commit_version
    __commit_version
    __commit_timestamp
    __commit_timestamp
    跟踪变更顺序
  • 保存消费的最大版本号,用于故障恢复

实际应用场景

1 实时数据同步

-- 定期执行,将变更同步到目标表 INSERT INTO target_table SELECT id, name, value, updated_at FROM source_stream;

可结合定时任务或触发器实现自动同步。

2 增量ETL流程

-- 增量提取、转换并加载数据 INSERT INTO dwh_fact_table (dimension_id, metric_value, load_date) SELECT dim.dimension_id, stream.value, current_date() FROM source_stream stream JOIN dimension_table dim ON stream.id = dim.source_id;

3 事件驱动处理

-- 检测特定事件并触发处理 CREATE OR REPLACE PROCEDURE process_high_value_changes() AS BEGIN -- 检查是否有高价值变更 DECLARE high_value_changes CURSOR FOR SELECT * FROM value_stream WHERE value > 1000; -- 处理这些变更 FOR change IN high_value_changes DO -- 执行处理逻辑 INSERT INTO high_value_alerts VALUES (change.id, change.value, current_timestamp()); END FOR; -- 消费所有变更 INSERT INTO processed_changes SELECT * FROM value_stream; END;

4 审计跟踪

-- 捕获所有变更用于审计 CREATE TABLE STREAM audit_stream ON TABLE sensitive_data WITH PROPERTIES ( 'TABLE_STREAM_MODE' = 'APPEND_ONLY', 'SHOW_INITIAL_ROWS' = 'TRUE' ); -- 定期归档到审计表 INSERT INTO audit_history SELECT *, __commit_timestamp AS audit_timestamp, __commit_version AS change_version FROM audit_stream;

性能优化

1 减少数据体积

  • 只选择必要的列而非
    SELECT *
    SELECT *
  • 在源表上设置适当的保留期
  • 定期消费Stream数据以避免累积

2 批量处理

-- 批量消费多个Stream并合并处理 INSERT INTO consolidated_target SELECT 'customers' AS source, id, name, NULL AS product_id, NULL AS order_id, __commit_timestamp FROM customer_stream UNION ALL SELECT 'products' AS source, id, name, product_id, NULL AS order_id, __commit_timestamp FROM product_stream UNION ALL SELECT 'orders' AS source, id, NULL AS name, NULL AS product_id, order_id, __commit_timestamp FROM order_stream;

3 并行处理

将大型Stream拆分为多个较小的部分并行处理:

-- 分区1处理 INSERT INTO target_partition_1 SELECT * FROM source_stream WHERE MOD(id, 4) = 0; -- 分区2处理 INSERT INTO target_partition_2 SELECT * FROM source_stream WHERE MOD(id, 4) = 1; -- 以此类推...

4 频率优化

  • 高变更率表:更频繁地消费Stream
  • 低变更率表:降低消费频率
  • 关键表:实时或近实时消费
  • 非关键表:批量定期消费

常见问题和解决方案

1 Stream不捕获变更

问题:创建 Stream 后未能捕获表变更。

解决方案

  1. 确认 DML 操作在 Stream 创建后执行
  2. 验证是否有足够权限
  3. 确认流式写入的数据已提交(实时写入需等待约 1 分钟)

2 Stream 中未出现 UPDATE_BEFORE/UPDATE_AFTER/DELETE

问题:查询 STANDARD 模式的 Stream,只看到 INSERT,没有 UPDATE 或 DELETE 记录。

原因:UPDATE/DELETE 操作发生在 Stream 创建之前,Stream 只能捕获创建后的变更。

__change_type
__change_type
的取值(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE)与
SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
参数无关。

解决方案

  1. 确认 UPDATE/DELETE 操作是在 Stream 创建之后执行的
  2. 如需捕获历史变更,使用
    TIMESTAMP AS OF
    TIMESTAMP AS OF
    将 Stream 的起始位点设置到操作发生之前的时间点
  3. 使用
    DESC TABLE STREAM
    DESC TABLE STREAM
    检查
    current_offset_time
    current_offset_time
    ,确认 Stream 的当前位点是否早于你期望捕获的变更时间

3 重复消费数据

问题:重复运行消费逻辑导致目标表出现重复数据。

解决方案

  1. 使用MERGE语句替代INSERT
  2. 实现幂等性处理
  3. 记录最后消费的版本和时间戳

-- 幂等消费示例 MERGE INTO target_table t USING my_stream s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.name = s.name, t.value = s.value, t.updated_at = s.updated_at WHEN NOT MATCHED THEN INSERT (id, name, value, updated_at) VALUES (s.id, s.name, s.value, s.updated_at);

4 消费后未移动Offset

问题:消费后再次查询仍返回相同数据。

解决方案

  1. 确保使用 DML 操作消费数据(INSERT、UPDATE、MERGE)
  2. 不要仅使用 SELECT 查询,这不会移动 offset
  3. 检查 DML 操作是否成功提交

最佳实践总结

1 设计原则

  1. 直接创建:Table Stream 可直接在任何普通表上创建,无需额外配置
  2. 选择合适的模式:根据需求选择 STANDARD 或 APPEND_ONLY 模式
  3. 多消费者各建独立 Stream:不同下游任务不能共用同一个 Stream,否则先消费的任务会让后消费的任务看不到数据
  4. DML 才推进偏移量:SELECT 不消费数据;WHERE 条件不阻止偏移量推进,过滤掉的数据会被丢弃
  5. 定期消费:不要让 Stream 累积过多数据,消费频率应远短于源表的
    DATA_RETENTION_DAYS
    DATA_RETENTION_DAYS
    (默认 1 天)
  6. 理解
    SHOW_INITIAL_ROWS
    SHOW_INITIAL_ROWS
    的影响
    :此参数控制建 Stream 时表中已有数据是否可见,不影响
    __change_type
    __change_type
    的取值。无论何种设置,STANDARD 模式下 UPDATE 均产生
    UPDATE_BEFORE
    UPDATE_BEFORE
    /
    UPDATE_AFTER
    UPDATE_AFTER
    ,DELETE 均产生
    DELETE
    DELETE

2 使用清单

  • 确认源表已存在且结构正确
  • 选择合适的 Stream 模式(STANDARD / APPEND_ONLY)
  • 多个下游消费者各自创建独立的 Stream
  • 根据需求选择
    SHOW_INITIAL_ROWS
    SHOW_INITIAL_ROWS
    • 'FALSE'
      'FALSE'
      (默认):建 Stream 时表中已有数据不可见,只捕获建 Stream 之后的变更
    • 'TRUE'
      'TRUE'
      :建 Stream 时表中已有数据以
      INSERT
      INSERT
      形式暴露在首次消费中
    • 两种设置下,STANDARD 模式的
      __change_type
      __change_type
      行为完全一致(UPDATE 产生
      UPDATE_BEFORE
      UPDATE_BEFORE
      /
      UPDATE_AFTER
      UPDATE_AFTER
      ,DELETE 产生
      DELETE
      DELETE
  • 使用 DML 操作消费数据(不要只用 SELECT)
  • 确认 WHERE 条件过滤的数据是否可以丢弃(如不能,先全量消费到中间表)
  • 实现幂等性消费机制(MERGE 替代 INSERT)
  • 消费频率 < 源表 DATA_RETENTION_DAYS(默认 1 天),避免 Stream 失效
  • 监控 Stream 积压量和消费延迟
  • 实现错误处理和重试逻辑

3 成功实现的关键

  • 理解机制:掌握Stream的工作原理和限制
  • 适当测试:在生产环境部署前充分测试
  • 定期维护:监控和优化Stream性能
  • 记录状态:跟踪消费状态,确保数据一致性
  • 容错设计:考虑故障恢复和边缘情况

遵循这些最佳实践,你将能够充分利用云器Lakehouse Table Stream功能,构建高效可靠的数据变更捕获和处理流程。

参考文档

  1. 云器 Table Stream 文档 - 功能描述和语法参考
  2. 云器 Table Stream 创建语法 - 详细的创建语法和参数说明
  3. 变更数据捕获 (CDC) 最佳实践 - 变更数据捕获相关的一般性最佳实践
  4. 云器 SQL参考手册 - 完整的SQL语法参考,包括Table Stream相关操作

注:本指南基于 2025 年 5 月的云器 Lakehouse 版本测试结果,后续版本可能有所变化。请定期检查官方文档以获取最新信息。

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询