Dynamic Table 支持参数化定义

Dynamic Table 的参数化定义由两部分组成。

  • 创建分区动态表时,参数通过 SESSION_CONFIGS()['dt.args.xx'] 进行定义,用于写在 SQL 加工逻辑中,表示查询源表。SESSION_CONFIGS()是系统内置函数
    'dt.args.xx'
    'dt.args.xx'
    :DT参数的名称,必须以
    dt.arg.
    dt.arg.
    开头,以避免与系统内部字段冲突。表达的含义和传统调度中select * from source_table where pt=${bizdate},
    SESSION_CONFIGS()['dt.args.pt']
    SESSION_CONFIGS()['dt.args.pt']
    等价于
    pt=${bizdate}
    pt=${bizdate}
    。SESSION_CONFIGS()['dt.args.xx'] 返回值类型为 String。如果需要其它类型的参数,需要使用 CAST 函数进行转换,例如
    cast(SESSION_CONFIGS()['dt.args.xx'] as int)
    cast(SESSION_CONFIGS()['dt.args.xx'] as int)
    。 如下案例:

--源表 CREATE TABLE source_table (col1 string, col2 string, pt string) PARTITIONED BY (pt); --定义动态表 CREATE dynamic TABLE incremental_dt (col1, col2, pt) PARTITIONED BY (pt) AS SELECT col1, nvl(col2, col1), pt FROM source_table WHERE pt = SESSION_CONFIGS () ['dt.args.pt'];

  • 刷新时通过
    refresh dynamic table target_table partition(pt=${bizdate});
    refresh dynamic table target_table partition(pt=${bizdate});
    指定分区值,其中
    pt=${bizdate}
    pt=${bizdate}
    。这对应于传统的
    insert overwrite target_table partition(pt=${bizdate})
    insert overwrite target_table partition(pt=${bizdate})

--上面中定义的动态表分区字段是pt。因此刷新时传入pt=${bizdate}。这里假定bizdate是2024-11-13。刷新时应该使用如下语法 --将2024-11-13传入到创建语句时的SESSION_CONFIGS()['dt.args.pt']中,替换为2024-11-13用于过滤source_table中的数据 SET dt.args.pt = 2024-11-13; --刷新时指定pt=2024-11-13表示写入到动态表的2024-11-13分区中 REFRESH dynamic TABLE incremental_dt PARTITION (pt = '2024-11-13');

全量刷新与增量刷新

全量刷新

全量刷新发生在以下情况:

  1. 非分区表

    • 如果在非分区表中使用了参数
      SESSION_CONFIGS()['dt.args.event_day']
      SESSION_CONFIGS()['dt.args.event_day']
      ,系统会根据参数值的变化决定刷新方式。
    • 如果参数值保持不变,系统将执行增量刷新
    • 如果参数值发生变化,系统将执行全量刷新,因为参数值的变化等同于改变了表的定义。
  2. 分区表

    • 如果分区已存在,但当前刷新的参数与上一次刷新的参数不相同,则进行一次全量刷新,因为参数值的变化会导致 SQL 加工逻辑发生变化。
    • 如果分区不存在(即第一次刷新某个分区),则进行一次全量刷新。

-- 创建源表 CREATE TABLE source_table (col1 string, col2 string, pt string) PARTITIONED BY (pt); -- 创建动态表 CREATE dynamic TABLE incremental_dt (col1, col2, pt) PARTITIONED BY (pt) AS SELECT col1, nvl(col2, col1), pt FROM source_table WHERE pt = SESSION_CONFIGS () ['dt.args.pt']; -- 示例 1:首次设置参数值为 2024-11-13 SET dt.args.xxx = 1; SET dt.args.pt = 2024-11-13; --刷新为全量刷新 REFRESH dynamic TABLE incremental_dt PARTITION (pt = '2024-11-13'); -- 示例 2:果第一次刷新某个分区,对应的参数是一个值,如果后面相同分区对应的参数发生了改变了,则相当于dt在这个分区的定义被修改了,所以会重新刷新,如下案例 SET dt.args.xxx = 2; SET dt.args.pt = 2024-11-14; -- 刷新动态表,指定分区 pt=2024-11-14 -- 系统会执行全量刷新,因为参数值发生了变化 REFRESH dynamic TABLE incremental_dt PARTITION (pt = '2024-11-14');

增量刷新

增量刷新发生在以下情况:

  • 非分区表的参数值保持不变。
  • 分区表的参数值保持不变,且分区条件未改变。

示例代码

-- 创建源表 CREATE TABLE source_table (col1 string, col2 string, pt string) PARTITIONED BY (pt); -- 定义动态表 CREATE dynamic TABLE incremental_dt (col1, col2, pt) PARTITIONED BY (pt) AS SELECT col1, nvl(col2, col1), pt FROM source_table WHERE pt = SESSION_CONFIGS () ['dt.args.pt']; -- 示例 1:首次设置参数值为 2024-11-13 SET dt.args.pt = 2024-11-13; -- 刷新动态表,指定分区 pt=2024-11-13,全量刷新 REFRESH dynamic TABLE target_table PARTITION (pt = '2024-11-13'); -- 示例 2:参数值和分区值未变化,再次刷新 SET dt.args.pt = 2024-11-13; -- 刷新动态表,指定分区 pt=2024-11-13 -- 系统会继续执行增量刷新 REFRESH dynamic TABLE target_table PARTITION (pt = '2024-11-13');

刷新语句

参数化定义的Dynamic Table的刷新行为取决于表是否为分区表。

非分区表刷新语法

REFRESH DYNAMIC TABLE dt;

  • 非分区表的参数值保持不变,会进行增量刷新。非分区表的参数值发生变化,会进行全量刷新。

分区表刷新语法

REFRESH DYNAMIC TABLE dt PARTITION partition_spec;

语句刷新分区表时,必须按照表的分区层级顺序指定

partition_spec
partition_spec
。这意味着,如果表按照多个字段进行分区,这些字段需要按照从最高级别到最低级别的顺序被指定。

  • 多级分区,partition_spec 需要根据分区的层级依次指定。例如,如果表有三级分区(day, hour, min),必须从高到低依次指定,不能跳过某个分区。指定 day hour 是合法的,低级分区可以忽略,不用全声明。指定 day min 是不合法的,因为它跳过了 hour

    • --不合法的 set dt.args.day = 1; set dt.args.min = 1; REFRESH dynamic TABLE incremental_dt PARTITION (DAY = 1, MIN = 1); --合法的 set dt.args.day = 1; set dt.args.hour = 1; REFRESH dynamic TABLE incremental_dt PARTITION (DAY = 1, HOUR = 1);

示例说明

假设一个表按照

day
day
hour
hour
min
min
三级进行分区,正确的
partition_spec
partition_spec
指定方式如下:

  • 合法指定:可以指定高层级和部分低层级分区,但不可跳过任何中间层级分区。

set dt.args.day=2024-11-13; set dt.args.hour=23; REFRESH DYNAMIC TABLE dt PARTITION (day='2024-11-13', hour=23);

在这个例子中,

day
day
hour
hour
被指定,而
min
min
分区可以被忽略。

  • 不合法指定:跳过任何中间层级分区的指定是不被允许的。

set dt.args.day=2024-11-13 set dt.args.hour=30 REFRESH DYNAMIC TABLE dt PARTITION (day='2024-11-13', min=30);

注意事项

  • 参数与分区一致性:在执行 Dynamic Table 的刷新操作时,必须确保 SQL 计算逻辑中使用的分区参数值与 REFRESH 语句中指定的分区值保持一致。如果存在不一致,系统将在执行过程中报错。

CREATE dynamic TABLE incremental_dt (col1, col2, pt) PARTITIONED BY (pt) AS SELECT col1, nvl(col2, col1), pt FROM source_table WHERE pt = SESSION_CONFIGS () ['dt.args.pt']; --比如select col1, nvl(col2, col1), pt from source_table where pt = SESSION_CONFIGS()['dt.args.pt'];过滤出来对应的分区字段结果是9.系统将在执行过程中报错, set dt.args.event_day = 9; REFRESH dynamic TABLE event_gettime_pt PARTITION (event_day = 19);

  • 并发刷新任务:在这些命令中,参数值与分区值匹配,因此可以并发执行而不会发生冲突。只要分区之间不存在冲突,系统允许同时执行多个分区的刷新任务。

-- 为分区 event_day=19 设置参数并刷新 set dt.args.event_day = 19; REFRESH dynamic TABLE event_gettime_pt PARTITION (event_day = 19); -- 为分区 event_day=20 设置参数并刷新 set dt.args.event_day = 20; REFRESH dynamic TABLE event_gettime_pt PARTITION (event_day = 20);

使用案例

  • 动态表分区字段和源表中字段一致

CREATE TABLE event_tb_pt ( event STRING, process DOUBLE, event_time TIMESTAMP ); INSERT INTO event_tb_pt VALUES ('event-0', 20.0, TIMESTAMP '2024-09-20 14:43:13'), ('event-0', 20.0, TIMESTAMP '2024-09-19 11:40:13'), ('event-1', 20.0, TIMESTAMP '2024-09-19 11:40:13'); --创建动态表 CREATE dynamic table event_gettime_pt partitioned by(event) AS SELECT event, process, YEAR(event_time) event_year, MONTH(event_time) event_month, DAY(event_time) event_day FROM event_tb_pt where event=SESSION_CONFIGS()['dt.args.event']; --刷新动态表 set dt.args.event = event-0; REFRESH dynamic TABLE event_gettime_pt PARTITION (event = 'event-0'); SELECT *FROM event_gettime_pt;

  • 动态表分区字段和源表字段名不一致。过滤条件需要根据
    event
    event
    字段过滤,而动态表的分区字段是
    event_year
    event_year

DROP TABLE IF EXISTS event_tb_pt; CREATE TABLE event_tb_pt ( event STRING, process DOUBLE, event_time TIMESTAMP ); INSERT INTO event_tb_pt VALUES ('event-0', 20.0, TIMESTAMP '2024-09-20 14:43:13'), ('event-0', 20.0, TIMESTAMP '2024-09-19 11:40:13'), ('event-1', 20.0, TIMESTAMP '2024-09-19 11:40:13'); --创建动态表 DROP dynamic TABLE IF EXISTS event_gettime_pt; CREATE dynamic table event_gettime_pt partitioned by(event_year) AS SELECT event, process, YEAR(event_time) event_year, MONTH(event_time) event_month, DAY(event_time) event_day FROM event_tb_pt where event=SESSION_CONFIGS()['dt.args.event']; --刷新动态表 set dt.args.event = event-0; REFRESH dynamic TABLE event_gettime_pt PARTITION (event_year = 2024); SELECT * FROM event_gettime_pt;

  • 多级分区刷新

DROP TABLE IF EXISTS event_tb_pt; CREATE TABLE event_tb_pt ( event STRING, process DOUBLE, event_time TIMESTAMP ); INSERT INTO event_tb_pt VALUES ('event-0', 20.0, TIMESTAMP '2024-09-20 14:43:13'), ('event-0', 20.0, TIMESTAMP '2024-09-19 11:40:13'), ('event-1', 20.0, TIMESTAMP '2024-09-19 11:40:13'); --创建动态表 DROP dynamic TABLE IF EXISTS event_gettime_pt; CREATE dynamic table event_gettime_pt partitioned by(event_year,event_month,event_day) AS SELECT event, process, YEAR(event_time) event_year, MONTH(event_time) event_month, DAY(event_time) event_day FROM event_tb_pt where event=SESSION_CONFIGS()['dt.args.event']; --多级分区刷新,指定高层级分区 set dt.args.event = event-0; REFRESH dynamic TABLE event_gettime_pt PARTITION (event_year = 2024,event_month =9);

场景案例

案例一:将离线任务转换为增量任务

本节将指导用户如何将原有的离线任务转换为增量任务,以实现更高效的数据处理。以下是一个基于“传统调度”的具体操作步骤,适用于业务逻辑按天对齐和按天调度刷新的场景。

  • 步骤 1:参数化原始 SQL。原始 SQL如下

    with tmp_channel as ( select channel_code, channel_name, channel_type, channel_uid from dim.dim_shop_sales_channel_main where pt = '${bizdate}' ), tmp_bac_misc as ( select mini_number, bac_no from dim.dim_customer_bac_misc_df where pt = '${bizdate}' ), tmp_fxiaoke as ( select case when record_type in ('dealer__c') then nvl(bac_no, account_no) else account_no end as channel_code, id, account_no from ods.ods_account_obj as a left join tmp_bac_misc on a.account_no = tmp_bac_misc.mini_number where pt = '${bizdate}' and account_no is not null -- and is_deleted = 0 -- and life_status not in ('invalid', 'ineffective') ) insert overwrite table dim.dim_shop_sales_channel_misc partition(pt='${bizdate}') select tmp_channel.channel_code, channel_name, channel_type, channel_uid, id as fxiaoke_id, account_no as fxiaoke_account_no from tmp_channel left join tmp_fxiaoke on tmp_channel.channel_code = tmp_fxiaoke.channel_code ;

    首先,需要将原始 SQL 中的所有由调度引擎传入的参数

    ${bizdate}
    ${bizdate}
    替换为
    SESSION_CONFIGS()['dt.args.bizdate']
    SESSION_CONFIGS()['dt.args.bizdate']
    。这一步骤将使得参数值可以通过配置动态传入,而不是硬编码在 SQL 中。

    原始 SQL 参数替换: 将所有

    ${bizdate}
    ${bizdate}
    替换为
    SESSION_CONFIGS()['dt.args.bizdate']
    SESSION_CONFIGS()['dt.args.bizdate']

    create dynamic table im.dim_shop_sales_channel_misc partitioned by(pt) with tmp_channel as ( select channel_code, channel_name, channel_type, channel_uidfrom dim.dim_shop_sales_channel_main where pt = SESSION_CONFIGS()['dt.args.bizdate'] ), tmp_bac_misc as ( select mini_number, bac_nofrom dim.dim_customer_bac_misc_df where pt = SESSION_CONFIGS()['dt.args.bizdate'] ), tmp_fxiaoke as ( select case when record_type in ('dealer__c') then nvl(bac_no, account_no) else account_noend as channel_code, id, account_nofrom ods.ods_account_obj as aleft join tmp_bac_misc on a.account_no = tmp_bac_misc.mini_number where pt = SESSION_CONFIGS()['dt.args.bizdate'] and account_no is not null ) select tmp_channel.channel_code, channel_name, channel_type, channel_uid, id as fxiaoke_id, account_no as fxiaoke_account_no, pt from tmp_channel left join tmp_fxiaoke on tmp_channel.channel_code = tmp_fxiaoke.channel_code ;

  • 步骤 2:调度刷新命令 在每次调度时,需要将参数

    dt.args.bizdate
    dt.args.bizdate
    设置为具体的日期值,并执行刷新命令。

    调度刷新命令示例

    SET dt.args.bizdate=20241130; -- ${bizdate}由Studio每次替换为具体的值 REFRESH DYNAMIC TABLE DT PARTITION (pt ='20241130');

案例二:增量任务数据补数

在某些情况下,用户可能需要向已有的分区中补充数据。

  • 方法 1:向源表补充数据,用户可以直接向源表中补充数据。这些补充的数据将通过相应的 REFRESH 任务自动反映到 Dynamic Table(DT)中。

    操作步骤

    1. 直接向源表插入或更新数据。
    2. 执行 REFRESH 任务,以将更改同步到 DT 中。
  • 方法 2:使用 DML 语句直接向 DT 补充数据,用户也可以使用 DML 语句直接向 DT 的特定分区中插入数据。 操作步骤

    1. 使用 DML 语句向 DT 的特定分区插入数据。
    2. 请注意,直接修改 DT 将导致下一次该分区的全量刷新。如果用户不希望出现全量刷新的结果,应避免调度该分区的 REFRESH 任务。 示例代码

    INSERT INTO DYNAMIC TABLE incremental_dt VALUES (...);

注意事项

  • 直接向 DT 插入的数据将参与 DT 的下游计算。如果下游的老分区不需要这些数据,请不要调度涉及这些数据的分区的 REFRESH 任务。
  • 其他未受影响的分区仍然可以进行增量刷新。

案例三:在不同 VC 中执行增量任务

对于参数化声明的分区化 DT,不同分区的刷新任务可以同时执行。用户可以根据需要将不同的 REFRESH 任务分配到不同的虚拟集群(VC)中执行。 操作步骤

  1. 根据时效性要求和资源需求,将不同的 REFRESH 任务分配到不同的 VC 中。
  2. 例如,对于时效性要求较高的新分区,可以将其 REFRESH 任务放在资源较多的大 VC 中执行。
  3. 对于其他老分区的补充任务,可以将其 REFRESH 任务放在资源较少的小 VC 中执行。
联系我们
预约咨询
微信咨询
电话咨询
邮件咨询