-- 源表执行 UPDATE: UPDATE source SET val = 999 WHERE id = 1
SELECT __change_type, __commit_version, id, val FROM my_stream ORDER BY id, __change_type;
+-----------------+------------------+----+-----+
| __change_type | __commit_version | id | val |
+-----------------+------------------+----+-----+
| UPDATE_AFTER | 4 | 1 | 999 | <-- 新值
| UPDATE_BEFORE | 2 | 1 | 100 | <-- 旧值
+-----------------+------------------+----+-----+
-- 源表执行 DELETE: DELETE FROM source WHERE id = 2
SELECT __change_type, id, val FROM my_stream;
+-----------------+----+-----+
| __change_type | id | val |
+-----------------+----+-----+
| DELETE | 2 | 200 | <-- 被删除的行,保留字段值
+-----------------+----+-----+
APPEND_ONLY 模式下,
__change_type
__change_type
始终为
INSERT
INSERT
,UPDATE 和 DELETE 操作不产生任何记录。
SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
参数的实际作用
SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
控制的是建 Stream 时表中已有数据是否可见,不影响
__change_type
__change_type
的取值:
SHOW_INITIAL_ROWS
SHOW_INITIAL_ROWS
行为
'FALSE'
'FALSE'
(默认)
建 Stream 时表中已有的数据不可见,只捕获建 Stream 之后发生的变更
'TRUE'
'TRUE'
建 Stream 时表中已有的数据以
INSERT
INSERT
形式暴露在首次消费中,消费完初始快照后,后续变更正常产生
UPDATE_BEFORE
UPDATE_BEFORE
/
UPDATE_AFTER
UPDATE_AFTER
/
DELETE
DELETE
在 MERGE 语句中消费 STANDARD 模式 Stream 的标准写法:
MERGE INTO target t
USING source_stream s ON t.id = s.id
WHEN MATCHED AND s.__change_type = 'UPDATE_AFTER'
THEN UPDATE SET t.name = s.name, t.val = s.val
WHEN MATCHED AND s.__change_type = 'DELETE'
THEN DELETE
WHEN NOT MATCHED AND s.__change_type = 'INSERT'
THEN INSERT (id, name, val) VALUES (s.id, s.name, s.val);
-- UPDATE_BEFORE 行不需要处理,MERGE 会自动忽略未匹配的条件
使用 Table Stream
前置条件
Table Stream 可以直接在任何普通表上创建,无需额外配置。
💡 提示:如果源表已开启
change_tracking
change_tracking
,Table Stream 的行为保持一致。
change_tracking
change_tracking
不是创建 Table Stream 的前置条件。
创建 Table Stream
Table Stream 可以直接在任何普通表上创建:
-- 创建测试表
CREATE TABLE data_change_test (id INT, name STRING);
-- 插入初始数据
INSERT INTO data_change_test VALUES (1, 'apple');
-- 创建 APPEND_ONLY 类型的 Table Stream
CREATE TABLE STREAM data_change_test_stream ON TABLE data_change_test
WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY');
查询 Table Stream
-- 插入新数据
INSERT INTO data_change_test VALUES (2, 'banana');
-- 查询源表(2 条记录)
SELECT * FROM data_change_test;
+----+--------+
| id | name |
+----+--------+
| 1 | apple |
| 2 | banana |
+----+--------+
-- 查询 Table Stream(仅 1 条新增记录)
SELECT * FROM data_change_test_stream;
+----+--------+
| id | name |
+----+--------+
| 2 | banana |
+----+--------+
消费 Table Stream
使用 DML 语句消费 Table Stream 时,位点会自动更新:
-- 创建目标表
CREATE TABLE data_change_test_offset (id INT, name STRING);
-- 消费 Stream 数据(位点自动更新)
INSERT INTO data_change_test_offset SELECT id, name FROM data_change_test_stream;
-- 再次查询 Stream(数据已被消费,返回空)
SELECT * FROM data_change_test_stream;
+----+------+
| id | name |
+----+------+
+----+------+
⚠️ WHERE 条件不阻止 offset 推进:如果消费 Stream 的 DML 语句带有 WHERE 条件,被过滤掉的数据不会保留到下次消费——offset 照样推进到最新位置,过滤掉的变更记录永久丢失。如果只想处理部分数据但不丢弃其余数据,应先全量消费到中间表,再从中间表筛选:
-- 正确做法:先全量消费到中间表
INSERT INTO staging SELECT * FROM my_stream;
-- 再从中间表按条件处理
INSERT INTO target SELECT * FROM staging WHERE value > 100;
删除 Table Stream
DROP TABLE STREAM IF EXISTS data_change_test_stream;