TABLE STREAM 功能介绍
概述
Table Stream 是 Lakehouse 架构中的一种对象,它能够记录对表进行的数据操作语言(DML)的更改,包括插入、更新和删除操作。Table Stream 还提供有关每次更改的元数据信息,使得用户可以利用这些信息采取相应的操作。它可以记录表中两个事务时间点之间的行级别更改,类似于关系数据库的变更数据捕获(CDC)功能。下游系统可以通过 SQL 语句消费 Table Stream,当下游的 DML 操作包含 Table Stream 时,会导致 Table Stream 的位点自动偏移。支持在Table、Dynamic Table、Materialized View、External Table(Kafka)创建Table Stream
Table、Dynamic Table和Materialized View 的TABLE STREAM OFFSET
Table Stream Offset 是一种存储流的偏移量(即源对象的当前事务版本)的机制。偏移量决定了 Table Stream 返回的变化记录的范围。以下是 Table Stream Offset 的一些特点:
- 当创建 Table Stream 时,会对源对象的每一行进行一个初始快照,以初始化一个偏移量。随后,Table Stream 会记录在此快照之后发生的 DML 变化的信息。
- Table Stream 本身不包含任何表数据,只存储源对象的偏移量,并利用源对象的版本历史来返回变化记录。
- Table Stream 的偏移量可以在创建时指定,也可以在消费时更新。指定偏移量的方法使用时间戳。
- Table Stream 的偏移量位于源对象的两个表版本之间。查询 Table Stream 时,会返回在偏移量之后和当前时间之前提交的事务所造成的变化。
Table、Dynamic Table和Materialized View的TABLE 版本控制
在 Lakehouse 中,每当对表进行插入、更新或删除操作时,都会生成一个新的表版本(也称为快照)。这些版本是不可变的,意味着一旦创建,它们就不能被修改。每个版本都包含了自上一版本以来所有数据变更的记录。Table Stream 是基于 TABLE 版本实现的。当创建一个 Stream 时,它会跟踪源表的所有后续版本,并允许用户查询自从创建 Table Stream 以来发生的变更。

上面的例子显示了一个源表在时间线上有10个已提交的版本。Table Stream的偏移目前在表版本v3和v4之间。当查询(或消费)流时,返回的记录包括从表版本v4开始,即表时间线中流偏移之后的版本,到v10结束,即时间线中最近提交的表版本,包括这两个版本之间的最小更改集。
Table、Dynamic Table和Materialized View的TABLE STREAM支持的类型
- STANDARD 模式:在此模式下,可以跟踪源对象的所有 DML 变化,包括插入、更新和删除(包括表截断)。这种模式提供行级别的变化,是通过将所有变化的 delta 数据进行连接加工来提供行级别增量。Table Stream 中的 delta 变化指的是在两个事务时间点之间发生的数据变化。
- APPEND_ONLY 模式:只记录对象的 INSERT 操作的数据。对于 UPDATE 和 DELETE 操作不会进行记录。
Table、Dynamic Table和Materialized View的TABLE STREAM 记录数据的范围
这个时间范围取决于源对象的数据保留期和数据延长期(DATA_RETENTION_DAYS)。数据保留期是指源对象的历史数据可以通过 Time Travel 查询的时间长度。
消费 TABLE STREAM
Table Stream 的消费者是指下游 SQL 中含有 DML 语句则会消费 Table Stream 数据。当下游的 DML 操作包含 Table Stream 时,会使 Table Stream 的位点自动偏移。执行 DQL 操作不会使位点偏移,例如 SELECT 语句。一个源对象可以有多个 stream 同时跟踪它的变化。每个 Table Stream 都可以有不同的 offset,即不同的起始时间点。每个 Table Stream 都可以被不同的消费者(consumer)使用,例如不同的任务、脚本或其他机制。消费者可以通过执行 DML 事务来消费 Table Stream 中的变化数据,从而更新 Table Stream 的 offset。
TABLE STREAM 中的元数据字段
当查询 Table Stream 时,返回的结果集中包含附加的元数据列,包含变化的类型、提交的版本、和提交的时间。具体字段如下:
__change_type
:包含 DML 操作(INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)
__commit_version
:数据提交的版本
__commit_timestamp
:数据提交的时间
注意事项
- 创建 Table Stream 之前,必须在基表上执行以下操作开启变更跟踪:
ALTER TABLE table_name set PROPERTIES ('change_tracking' = 'true');
- 通过实时上传数据写入的数据一分钟之后才可以读取,Table Stream 只能读取已经提交的数据。实时任务写入的数据需要等待 1 分钟才能确认,所以 Table Stream 也要等 1 分钟才能看到。
使用案例
APPEND_ONLY 模式案例
-- 创建一个测试表
CREATE TABLE test_table (id INT, name VARCHAR, age INT);
--创建table stream时必须开启
ALTER table test_table set PROPERTIES ('change_tracking' = 'true');
--创建只追加流
CREATE table stream test_stream ON TABLE test_table
WITH
PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
-- 插入一些数据到测试表
INSERT INTO test_table VALUES
(1, 'Alice', 20),
(2, 'Bob', 25),
(3, 'Charlie', 30),
(4, 'David', 35),
(5, 'Eve', 40);
-- 查询测试流,应该返回插入的数据
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
-- 更新一些数据到测试表
UPDATE test_table SET age = age + 5 WHERE id = 1 OR id = 3;
-- 查询测试流,应该返回第一次追加的记录,只记录第一次insert的数据,update的数据不会记录
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
-- 删除一些数据到测试表
DELETE FROM test_table WHERE id = 2 OR id = 4;
-- 查询测试流,应该返回第一次追加的记录,只记录第一次insert的数据,delete不会记录
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
-- 删除原表
DELETE FROM test_table;
-- 查询测试流,应该返回第一次追加的记录,只记录第一次insert的数据,delete不会记录
SELECT * FROM test_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| INSERT | 3 | 2025-04-28 17:37:25.785 | 1 | Alice | 20 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 2 | Bob | 25 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 3 | Charlie | 30 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 4 | David | 35 |
| INSERT | 3 | 2025-04-28 17:37:25.785 | 5 | Eve | 40 |
+---------------+------------------+-------------------------+----+---------+-----+
** 注意**
创建Table Stream,必须在基表上执行
ALTER TABLE table_name set PROPERTIES ('change_tracking' = 'true');
STANDARD模式案例
-- 创建一个测试表
CREATE TABLE test_table_offset (id INT, name VARCHAR, age INT);
--创建table stream时必须开启
ALTER TABLE test_table_offset set PROPERTIES ('change_tracking' = 'true');
CREATE table stream test_table_offset_stream ON TABLE test_table_offset
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD');
-- 插入一些数据到测试表
INSERT INTO test_table_offset VALUES
(1, 'Alice', 20),
(2, 'Bob', 25),
(3, 'Charlie', 30),
(4, 'David', 35),
(5, 'Eve', 40);
-- 查询测试流,应该返回插入的数据
CREATE TABLE test_table_offset_consume (id INT, name VARCHAR, age INT);
--把刚刚插入的数据也同步到目标表,保持一致
INSERT INTO test_table_offset_consume
SELECT id,name,age FROM test_table_offset_stream;
--查看strema是否有数据
SELECT * FROM test_table_offset_stream;
+---------------+------------------+--------------------+----+------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+--------------------+----+------+-----+
-- 更新一些数据到测试表
UPDATE test_table_offset SET age = age + 5 WHERE id = 1 OR id = 3;
-- 查询测试流,应该返回更新的数据,此时数据中会有两条更新前和更新后
SELECT * FROM test_table_offset_stream;
+---------------+------------------+-------------------------+----+---------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+---------+-----+
| UPDATE_AFTER | 4 | 2025-04-28 17:41:18.507 | 1 | Alice | 25 |
| UPDATE_AFTER | 4 | 2025-04-28 17:41:18.507 | 3 | Charlie | 35 |
| UPDATE_BEFORE | 3 | 2025-04-28 17:40:54.626 | 1 | Alice | 20 |
| UPDATE_BEFORE | 3 | 2025-04-28 17:40:54.626 | 3 | Charlie | 30 |
+---------------+------------------+-------------------------+----+---------+-----+
--将更新后的数据消费,使用stream的数据更新目标表
MERGE INTO test_table_offset_consume target USING test_table_offset_stream source_stream ON target.id = source_stream.id WHEN MATCHED
AND source_stream.__change_type = 'UPDATE_AFTER' THEN update set target.age = source_stream.age WHEN MATCHED
AND source_stream.__change_type = 'DELETE' THEN DELETE WHEN NOT MATCHED
AND source_stream.__change_type = 'INSERT' THEN
INSERT VALUES (target.id, target.name, target.age);
--查看更新后的表test_table_offset_consume数据是否正确
SELECT * FROM test_table_offset_consume;
+----+---------+-----+
| id | name | age |
+----+---------+-----+
| 1 | Alice | 25 |
| 3 | Charlie | 35 |
| 2 | Bob | 25 |
| 4 | David | 35 |
| 5 | Eve | 40 |
+----+---------+-----+
--查看table stream中是否还有数据,table stream的数据已经全部消费
SELECT * FROM test_table_offset_stream;
+---------------+------------------+--------------------+----+------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+--------------------+----+------+-----+
-- 删除一些数据到测试表
DELETE FROM test_table_offset WHERE id = 2 OR id = 4;
--查看table stream
SELECT * FROM test_table_offset_stream;
+---------------+------------------+-------------------------+----+-------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+-------------------------+----+-------+-----+
| DELETE | 3 | 2025-04-28 17:40:54.626 | 2 | Bob | 25 |
| DELETE | 3 | 2025-04-28 17:40:54.626 | 4 | David | 35 |
+---------------+------------------+-------------------------+----+-------+-----+
--将删除后的数据消费,使用table stream的数据更新目标表
MERGE INTO test_table_offset_consume target USING test_table_offset_stream source_stream ON target.id = source_stream.id WHEN MATCHED
AND source_stream.__change_type = 'UPDATE_AFTER' THEN update set target.age = source_stream.age WHEN MATCHED
AND source_stream.__change_type = 'DELETE' THEN DELETE WHEN NOT MATCHED
AND source_stream.__change_type = 'INSERT' THEN
INSERT VALUES (target.id, target.name, target.age);
----查看更新后的表test_table_offset_consume数据是否正确
SELECT * FROM test_table_offset_consume;
+----+---------+-----+
| id | name | age |
+----+---------+-----+
| 1 | Alice | 25 |
| 3 | Charlie | 35 |
| 5 | Eve | 40 |
+----+---------+-----+
--查看table stream中是否还有数据,table stream的数据已经全部消费
SELECT * FROM test_table_offset_stream;
+---------------+------------------+--------------------+----+------+-----+
| __change_type | __commit_version | __commit_timestamp | id | name | age |
+---------------+------------------+--------------------+----+------+-----+