CREATE TABLE STREAM

创建 Table Stream,用于捕获表的增量变更(INSERT / UPDATE / DELETE)。可以把 Stream 理解为"表上的消费游标"——它记录你上次读到哪个版本,下次查询时只返回那之后的变化行。

选型定位:捕获表的行级变更,驱动增量 ETL 或跨表同步;如果只需要自动维护加工结果,用 Dynamic Table;如果只需要持续从外部导入数据,用 Pipe。

Table Stream 语法

支持在 Table、Dynamic Table、Materialized View、External Table(Kafka)上创建 Table Stream;也支持在 External Volume 上创建 Volume Stream,用于监听对象存储中的文件变更事件。

Table、Dynamic Table、Materialized View 创建 Table Stream 语法

CREATE [OR REPLACE] TABLE STREAM [IF NOT EXISTS] <name> ON TABLE <table_name> [TIMESTAMP AS OF timestamp_expression] [COMMENT = '<string_literal>'] WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD','SHOW_INITIAL_ROWS'='TRUE|FALSE');

  • <name>
    <name>
    :Table Stream的名称。

  • <table_name>
    <table_name>
    :要获取增量的基表。不支持view。

  • TIMESTAMP AS OF timestamp_expression
    TIMESTAMP AS OF timestamp_expression
    :(可选)指定Table Stream应从底层表开始接收更新的时间戳表达式。如果省略此参数,Table Stream将从当前时间开始接收更新。

    • timestamp_expression
      timestamp_expression
      返回结果是一个标准的时间戳类型的表达式,TIMESTAMP AS OF指定的最早时间戳取决TIME TRAVEL(data_retention_days)参数,如果指定的版本不存在则会报错。如果未指定则使用当前时间戳的版本数据,例如:
      • '2023-11-07 14:49:18'
        '2023-11-07 14:49:18'
        ,即可以强制转换为时间戳的字符串。
      • cast('2023-11-07 14:49:18 Asia/Shanghai' as timestamp)
        cast('2023-11-07 14:49:18 Asia/Shanghai' as timestamp)
      • current_timestamp() - interval '12' hours
        current_timestamp() - interval '12' hours
      • 本身就是时间戳或可强制转换为时间戳的任何其他表达式。
  • COMMENT
    COMMENT
    :(可选)Table Stream的注释。

  • 'TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD'
    'TABLE_STREAM_MODE' = 'APPEND_ONLY|STANDARD'
    :(必选)值二选一,APPEND_ONLY和STANDARD。

    • APPEND_ONLY只记录对象的INSERT操作的数据。Update和delete操作不会记录。例如,最初在表中插入了10行,然后在点位没有移动的时候执行delete操作删除5行,Table Stream仍然记录10行操作。
    • STANDARD模式:STANDARD模式下可以跟踪源对象的所有DML变化,包括插入、更新和删除(包括表截断)。这种提供行级别的变化是通过将所有变化的delta数据进行连接加工来提供行级别增量。Table Stream中的delta变化指的是在两个事务时间点之间发生的数据变化。例如,如果在Table Stream的offset之后,有一个行被插入,然后被更新,那么delta变化就是一个新的行。如果在stream的offset之后,有一个行被插入,然后被删除,那么delta变化就是没有这个行。换句话说,delta变化会反映源对象的最新状态,而不是历史变化。
  • SHOW_INITIAL_ROWS:可选参数,控制建 Stream 时表中已有数据的可见性:

    • 'FALSE'
      'FALSE'
      (默认):建 Stream 时表中已有的数据不可见,只捕获建 Stream 之后发生的变更。
    • 'TRUE'
      'TRUE'
      :建 Stream 时表中已有的数据以
      INSERT
      INSERT
      形式暴露在首次消费中,消费完初始快照后,后续变更正常产生
      UPDATE_BEFORE
      UPDATE_BEFORE
      /
      UPDATE_AFTER
      UPDATE_AFTER
      /
      DELETE
      DELETE

    注意

    SHOW_INITIAL_ROWS
    SHOW_INITIAL_ROWS
    只影响建 Stream 时历史数据是否可见,不影响
    __change_type
    __change_type
    的取值
    。无论该参数为何值,STANDARD 模式下 UPDATE 均产生
    UPDATE_BEFORE
    UPDATE_BEFORE
    +
    UPDATE_AFTER
    UPDATE_AFTER
    两行,DELETE 均产生
    DELETE
    DELETE
    记录。

注意事项

  • TIMESTAMP AS OF
    TIMESTAMP AS OF
    指定的最早时间戳取决于 TIME TRAVEL
    data_retention_days
    data_retention_days
    )参数,如果指定的版本不存在则会报错。Lakehouse 默认保留数据 1 天,可通过
    ALTER TABLE ... SET PROPERTIES ('data_retention_days'='7')
    ALTER TABLE ... SET PROPERTIES ('data_retention_days'='7')
    延长。
  • 流式写入(Ingestion Service)的数据需等约 1 分钟提交后才在 Stream 中可见(见上方警告);SQL DML 写入的数据任务成功后立即可见。

使用案例

案例1:创建 APPEND_ONLY 模式的 Table Stream

-- 创建测试表 CREATE TABLE data_change_test (id INT, name STRING); INSERT INTO data_change_test VALUES (1, 'apple'); -- 创建 Stream(只捕获创建后的新增行) CREATE TABLE STREAM data_change_test_stream ON TABLE data_change_test WITH PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY'); -- 插入测试数据 INSERT INTO data_change_test VALUES (2, 'banana'); -- 查看 Stream 数据 SELECT __change_type, id, name FROM data_change_test_stream;

+-----------------+----+--------+ | __change_type | id | name | +-----------------+----+--------+ | INSERT | 2 | banana | +-----------------+----+--------+

Stream 创建前已有的

(1, 'apple')
(1, 'apple')
不出现;
__change_type
__change_type
固定为
INSERT
INSERT
,UPDATE/DELETE 不记录。

案例2:创建 STANDARD 模式的 Table Stream

-- 创建测试表 CREATE TABLE data_change_test (id INT, name STRING); INSERT INTO data_change_test VALUES (1, 'apple'); -- 创建 STANDARD 模式的 Stream CREATE TABLE STREAM data_change_test_stream ON TABLE data_change_test WITH PROPERTIES ('TABLE_STREAM_MODE' = 'STANDARD'); -- 插入、更新、删除 INSERT INTO data_change_test VALUES (2, 'banana'); UPDATE data_change_test SET name = 'orange' WHERE id = 2; DELETE FROM data_change_test WHERE id = 1; -- 查看 Stream 数据(STANDARD 模式反映净变化) SELECT __change_type, id, name FROM data_change_test_stream ORDER BY id, __change_type;

+-----------------+----+--------+ | __change_type | id | name | +-----------------+----+--------+ | DELETE | 1 | apple | | INSERT | 2 | orange | +-----------------+----+--------+

STANDARD 模式反映的是净变化(net change):

  • id=1
    id=1
    :被删除 → 一条
    DELETE
    DELETE
    行,值为删除前的数据
  • id=2
    id=2
    :先 INSERT 后 UPDATE → 合并为一条
    INSERT
    INSERT
    行,值为最终状态
    orange
    orange
    (中间的
    banana
    banana
    状态不单独出现)

如果需要看到

UPDATE_BEFORE
UPDATE_BEFORE
/
UPDATE_AFTER
UPDATE_AFTER
,需要在 Stream 创建后先消费一次(推进 offset),再执行 UPDATE 操作。

Kafka Table Stream

支持在Kafka外部表上创建表流(Table Stream),实现实时消费Topic数据.

语法

CREATE TABLE STREAM [IF NOT EXISTS] stream_name ON TABLE external_kafka_table [TIMESTAMP AS OF timestamp_expression] -- 可选,指定消费起始时间点位 WITH PROPERTIES ( 'table_stream_mode' = 'append_only', -- 仅支持追加模式 'show_initial_rows' = 'true'|'false' );

参数说明

  • TIMESTAMP AS OF
    TIMESTAMP AS OF
    :指定消费起始时间点(可选),支持格式:
    • 明确时间戳
      '2023-01-01 12:00:00'
      '2023-01-01 12:00:00'
    • 时间函数
      CURRENT_TIMESTAMP() - INTERVAL '1' HOUR
      CURRENT_TIMESTAMP() - INTERVAL '1' HOUR
  • show_initial_rows
    show_initial_rows
    :控制初始数据加载行为:
    • true
      true
      :加载从外部表创建时指定的点位到Table Stream指定的点位之间的历史数据
    • false
      false
      :从最新点位(
      latest
      latest
      )开始消费,不加载历史数据(默认值)
  • table_stream_mode
    table_stream_mode
    : 固定为
    append_only
    append_only
    ,仅处理Kafka新增数据(不支持更新/删除操作)

示例

  1. Kafka 外部表创建 首先,你需要创建一个 Kafka 外部表。这是创建 Table Stream 的基础。以下是创建 Kafka 外部表的语法:

创建Kafka Connection

CREATE STORAGE CONNECTION pipe_kafka TYPE kafka BOOTSTRAP_SERVERS = ['47.99.48.62:9092'] SECURITY_PROTOCOL = 'PLAINTEXT';

创建Kafka外部表

CREATE EXTERNAL TABLE external_table_kafka ( key_column binary, value_column binary NOT NULL) USING kafka OPTIONS ( 'group_id' = 'external_table_lh', 'topics' = 'test_long') CONNECTION pipe_kafka;

  • external_table_kafka
    external_table_kafka
     是外部表的名称。
  • key_column
    key_column
     和 
    value_column
    value_column
     分别代表 Kafka 消息的键和值,其中 
    value_column
    value_column
     是必需的。
  • USING kafka
    USING kafka
     指定了使用 Kafka 作为数据源。
  • OPTIONS
    OPTIONS
     部分包含了 Kafka 消费者配置,如消费者组 ID (
    group_id
    group_id
    ) 和要订阅的主题 (
    topics
    topics
    )。
  • CONNECTION pipe_kafka
    CONNECTION pipe_kafka
     指定了与 Kafka 的连接配置,这通常包括 Kafka 集群的地址和其他连接参数。
  1. 创建 Table Stream

在 Kafka 外部表的基础上,你可以创建一个 Table Stream,用于实时处理 Kafka 数据流。以下是创建 Table Stream 的语法:

CREATE TABLE STREAM kafka_table_stream_pipe1 ON TABLE external_table_kafka WITH PROPERTIES ( 'table_stream_mode' = 'append_only', 'show_initial_rows' = 'true');

Volume Stream

CREATE VOLUME STREAM 语法

用于在 External Volume 的 Directory Table 上创建 Volume Stream,以监听外部对象存储中文件的新增与删除事件。

CREATE [OR REPLACE] STREAM [IF NOT EXISTS] <name> ON VOLUME <volume_name>;

参数说明

<name>
<name>
:Volume Stream 的名称。

<volume_name>
<volume_name>
:要监听的 External Volume 名称。该 Volume 必须满足以下条件:

  • 已开启目录功能,即建表时指定 DIRECTORY = (ENABLE = TRUE);
  • 已开启递归监听,即建表时指定 RECURSIVE = TRUE。

示例

-- 创建Volume Stream,追踪文件新增和删除 CREATE OR REPLACE STREAM str_app_log ON VOLUME vol_app_log;

注意事项

  • 创建 Volume Stream 之前,须开启 DIRECTORY = (ENABLE = TRUE) 和 RECURSIVE = TRUE 。
  • Volume Stream 本质上是构建在 Directory Table 之上的 Table Stream,消费机制与 Table Stream 完全一致:仅 DML 语句(如 INSERT INTO ... SELECT FROM stream)会推进消费点位,纯 SELECT 查询不会推进。
  • 由于对象存储事件通过消息队列传递存在约 1 分钟的延迟,文件上传后需等待约 1 分钟才能在 Volume Stream 中查询到对应的变更记录。

相关指南

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询