注意事项

  • 请使用dbt-core 1.5.11、dbt-clickzetta 0.2.32及以上版本

准备工作

  1. 安装DBT以及ClickZetta Lakehouse插件
pip install dbt-core

注:如最新版本遇到不兼容问题,请使用dbt-core 1.5.11版本。

  1. 安装dbt-clickzetta 插件(使用了v0.2.32支持DYNAMIC TABLE/MV的版本):
pip install dbt-clickzetta
  1. 初始化DBT项目
$ dbt init cz_dbt_project
01:35:39  Running with dbt=1.5.11
Which database would you like to use?
[1] clickzetta

(Don't see the one you want? https://docs.getdbt.com/docs/available-adapters)

Enter a number: 1
base_url (https://clickzetta.com): uat-api.clickzetta.com
workspace (dev workspace): ql_ws
instance_name (dev instance name): jnsxwfyr
vc_name (vc name): default
user_name (user name): <user_name>
schema (default schema): dbt_dev
password (password): <your_passwd>
01:37:13  Profile cz_dbt_project written to /Users/username/.dbt/profiles.yml using target's profile_template.yml and your supplied values. Run 'dbt debug' to validate the connection.
01:37:13  
Your new dbt project "cz_dbt_project" was created!

$ cd cz_dbt_project
  1. 配置ClickZetta dbt项目profiles

打开并编辑 ~/.dbt/profiles.yml 文件,增加生产环境配置。参考如下内容:

cz_dbt_project:
  target: dev
  outputs:
    prod:
      type: clickzetta
      service: api.clickzetta.com
      instance: <your_instance_name>
      username: <user_name>
      password: <passwd>
      workspace: <your_workspace_name>
      schema: dbt_prod
      vcluster: default
    dev:
      type: clickzetta
      service: api.clickzetta.com
      instance: <your_instance_name>
      username: <user_name>
      password: <passwd>
      workspace: <your_workspace_name>
      schema: dbt_dev
      vcluster: default
  1. 验证配置
$ dbt debug

02:22:03    Connection test: [OK connection ok]

INFO:stdout_log:02:22:03    Connection test: [OK connection ok]

INFO:file_log:10:22:03.153933 [info ] [MainThread]:   Connection test: [OK connection ok]

02:22:03  All checks passed!
  1. 测试运行

通过dbt run将在目标dev环境构建dbt项目内置的2个测试模型:

  • model.cz_dbt_project.my_first_dbt_model
  • model.cz_dbt_project.my_second_dbt_model
$ dbt run

查看执行日志是否成功并检查目标环境下(例如dbt_dev schema)是否成功创建my_first_dbt_model、dbt_dev.my_second_dbt_model数据对象。

创建基于Table Stream的增量加工任务

场景说明

首先,将外部写入的表定义为Source Table,为Source Table创建Table Stream对象以获取增量变化数据;

其次,在DBT中创建使用Table Stream的增量模型("materialized='incremental' );

最后,多次运行模型观察增量处理效果。

准备Source表

创建一张原始表,通过数据集成工具持续导入数据:

CREATE TABLE public.ecommerce_events_multicategorystore_live(
  `event_time` timestamp,
  `event_type` string,
  `product_id` string,
  `category_id` string,
  `category_code` string,
  `brand` string,
  `price` decimal(10,2),
  `user_id` string,
  `user_session` string)
TBLPROPERTIES(
  'change_tracking'='true');

注意需要在表属性里增加'change_tracking'='true'设置以开启增量数据捕获能力。

为原始表创建一个Table Stream对象,跟踪原始表的变化记录:

-- Create stream on source table
CREATE TABLE STREAM public.stream_ecommerce_events 
on table ecommerce_events_multicategorystore_live
with PROPERTIES ('TABLE_STREAM_MODE' = 'APPEND_ONLY')

同时在cz_dbt_project的models下新建sources.yml文件,申明已经创建的2张source table:

开发模型

创建名称为events_enriched.sql的dbt模型,通过配置方式申明为增量模型:

{{
   config(
       materialized='incremental'
   )
}}

SELECT
  event_time,
  CAST(SUBSTRING(event_time,0,19) AS TIMESTAMP) AS event_timestamp, 
  SUBSTRING(event_time,12,2) AS event_hour, 
  SUBSTRING(event_time,15,2) AS event_minute, 
  SUBSTRING(event_time,18,2) AS event_second, 
  event_type,
  product_id, 
  category_id,
  category_code,
  brand,
  CAST(price AS DECIMAL(10,2)) AS price, 
  user_id, 
  user_session,
  CAST(SUBSTRING(event_time,0,19)AS date) AS event_date,
  CURRENT_TIMESTAMP() as loaded_at
FROM
{% if is_incremental() %}
{{source('quning', 'stream_ecommerce_events')}} 
{% else %}
{{source('quning', 'ecommerce_events_multicategorystore_live')}} 
{% endif %}

注意:

  • 当通过dbt build命令构建模型时,is_incremental() 条件判断为False,将使用原始表ecommerce_events_multicategorystore_live全量数据进行模型构建;
  • 当通过dbt run执行模型时,is_incremental() 将识别为True,将进行增量加工;

如果您希望在构建和后续运行时都仅使用Table Stream的增量数据(Table Stream默认仅提供Table Stream创建之后的变化数据)构建模型,可对模型进行如下定义:

{{
   config(
       materialized='incremental'
   )
}}

SELECT 
  `event_time` ,
  `event_type` ,
  `product_id` ,
  `category_id` ,
  `category_code` ,
  `brand` ,
  `price` ,
  `user_id` ,
  `user_session` ,
  CURRENT_TIMESTAMP() as load_time
  FROM 
{{source('public', 'stream_ecommerce_events')}}

构建模型

通过dbt build命令在目标环境创建模型:

dbt build --model events_enriched

通过观察日志,模型构建时通过以下语句构建模型,并初始对原始表全量进行加工转换:

/* {"app": "dbt", "dbt_version": "1.5.11", "profile_name": "cz_dbt_project", "target_name": "dev", "node_id": "model.cz_dbt_project.events_enriched"} */
create table dbt_dev.events_enriched
as
SELECT
  event_time,
  CAST(SUBSTRING(event_time,0,19) AS TIMESTAMP) AS event_timestamp, 
  SUBSTRING(event_time,12,2) AS event_hour, 
  SUBSTRING(event_time,15,2) AS event_minute, 
  SUBSTRING(event_time,18,2) AS event_second, 
  event_type,
  product_id, 
  category_id,
  category_code,
  brand,
  CAST(price AS DECIMAL(10,2)) AS price, 
  user_id, 
  user_session,
  CAST(SUBSTRING(event_time,0,19)AS date) AS event_date,
  CURRENT_TIMESTAMP() as loaded_at
FROM
public.ecommerce_events_multicategorystore_live;

在Lakehouse的目标环境下检查数据对象:

select count(*)  from events_enriched ;
`count`(*) 
---------- 
3700

数据对象以及初始数据成功创建和写入。

运行模型

通过dbt run命令运行模型:

dbt run --model events_enriched

根据dbt增量模型逻辑,dbt将在目标环境创建一个临时视图表示增量数据。

/* {"app": "dbt", "dbt_version": "1.5.11", "profile_name": "cz_dbt_project", "target_name": "dev", "node_id": "model.cz_dbt_project.events_enriched"} */

create or replace view dbt_dev.events_enriched__dbt_tmp as
SELECT
  event_time,
  CAST(SUBSTRING(event_time,0,19) AS TIMESTAMP) AS event_timestamp, 
  SUBSTRING(event_time,12,2) AS event_hour, 
  SUBSTRING(event_time,15,2) AS event_minute, 
  SUBSTRING(event_time,18,2) AS event_second, 
  event_type,
  product_id, 
  category_id,
  category_code,
  brand,
  CAST(price AS DECIMAL(10,2)) AS price, 
  user_id, 
  user_session,
  CAST(SUBSTRING(event_time,0,19)AS date) AS event_date,
  CURRENT_TIMESTAMP() as loaded_at
FROM
public.stream_ecommerce_events;

同时dbt根据模型中materialized='incremental'的配置,使用MERGE INTO将table stream中的增量数据写入目标模型:

/* {"app": "dbt", "dbt_version": "1.5.11", "profile_name": "cz_dbt_project", "target_name": "dev", "node_id": "model.cz_dbt_project.events_enriched"} */

    -- back compat for old kwarg name
merge into dbt_dev.events_enriched as DBT_INTERNAL_DEST
      using dbt_dev.events_enriched__dbt_tmp as DBT_INTERNAL_SOURCE
      on FALSE
  when not matched then insert
     (`event_time`,`event_timestamp`,`event_hour`,`event_minute`,`event_second`,`event_type`,`product_id`,`category_id`,`category_code`,`brand`,`price`,`user_id`,`user_session`,`event_date`,`loaded_at`)
  values (
        DBT_INTERNAL_SOURCE.`event_time`,DBT_INTERNAL_SOURCE.`event_timestamp`,DBT_INTERNAL_SOURCE.`event_hour`,DBT_INTERNAL_SOURCE.`event_minute`,DBT_INTERNAL_SOURCE.`event_second`,DBT_INTERNAL_SOURCE.`event_type`,DBT_INTERNAL_SOURCE.`product_id`,DBT_INTERNAL_SOURCE.`category_id`,DBT_INTERNAL_SOURCE.`category_code`,DBT_INTERNAL_SOURCE.`brand`,DBT_INTERNAL_SOURCE.`price`,DBT_INTERNAL_SOURCE.`user_id`,DBT_INTERNAL_SOURCE.`user_session`,DBT_INTERNAL_SOURCE.`event_date`,DBT_INTERNAL_SOURCE.`loaded_at`
    );

每次执行dbt run,都会从table stream中读取数据并Merge into到目标模型,写入成功后,Table Stream的变化记录位置会自动前进,下次dbt run时自动处理最新的增量数据。

创建基于动态表的加工任务

场景说明

结合前面的场景设计,这里继续使用dbt-clickzetta的Dynamic Table模型对已经完成转换的表进行聚合加工。

首先,在DBT中创建使用Dynamic Table的模型("materialized='dynamic_table' ),配置定义dynamic table的刷新周期、刷新使用的计算资信息以便在模型构建后系统能够根据调度参数自动进行刷新。

其次,在目标环境观察动态表模型的构建和刷新结果。

开发模型

创建名称为product_grossing.sql的dynamic table模型。

  • 代码定义
{{
   config(
       materialized = 'dynamic_table',
       vcluster = 'default',
       refresh_interval = '5 minute'
   )
}}

select 
  event_date,
  product_id,
  sum(price) sum_price 
from 
  {{ ref("events_enriched")}}
group by event_date,product_id

构建模型

通过dbt build命令在目标环境创建模型:

dbt build --model product_grossing

通过观察日志,模型构建时通过以下语句构建模型,并初始对原始表全量进行加工转换:

/* {"app": "dbt", "dbt_version": "1.5.11", "profile_name": "cz_dbt_project", "target_name": "dev", "node_id": "model.cz_dbt_project.product_grossing"} */
create or replace dynamic table dbt_dev.product_grossing
refresh interval 5 minute
vcluster default
as
select 
  event_date,
  product_id,
  sum(price) sum_price 
from 
  dbt_dev.events_enriched
group by event_date,product_id;

通过dbt可查看模型血缘关系:

在Lakehouse的目标环境下检查数据对象:

show tables;

schema_name table_name               is_view is_materialized_view is_external is_dynamic 
----------- ------------------------ ------- -------------------- ----------- ---------- 
dbt_dev     events_enriched          false   false                false       false      
dbt_dev     events_enriched__dbt_tmp true    false                false       false      
dbt_dev     my_first_dbt_model       false   false                false       false      
dbt_dev     my_second_dbt_model      true    false                false       false

同时通过desc命令查看动态表的信息,重点确认运行集群、刷新周期参数是否符合预期:

desc extended product_grossing;

column_name                  data_type                                                                                                                                                                                                                                                     
---------------------------- ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
event_date                   date                                                                                                                                                                                                                                                          
product_id                   string                                                                                                                                                                                                                                                        
sum_price                    decimal(20,2)                                                                                                                                                                                                                                                 
                                                                                                                                                                                                                                                                                           
# detailed table information                                                                                                                                                                                                                                                               
schema                       dbt_dev                                                                                                                                                                                                                                                       
name                         product_grossing                                                                                                                                                                                                                                              
creator                      xxx                                                                                                                                                                                                                                                        
created_time                 2024-06-22 21:03:40.467                                                                                                                                                                                                                                       
last_modified_time           2024-06-22 21:08:41.184                                                                                                                                                                                                                                       
comment                                                                                                                                                                                                                                                                                    
properties                   (("refresh_vc","default"))                                                                                                                                                                                                                                    
type                         DYNAMIC TABLE                                                                                                                                                                                                                                                 
view_text                    SELECT events_enriched.event_date, events_enriched.product_id, `sum`(events_enriched.price) AS sum_price FROM ql_ws.dbt_dev.events_enriched GROUP BY events_enriched.event_date, events_enriched.product_id;                                                  
view_original_text           select 
  event_date,
  product_id,
  sum(price) sum_price 
from 
  dbt_dev.events_enriched
group by event_date,product_id;                                                                                                                                   
source_tables                [86:ql_ws.dbt_dev.events_enriched=8278006558627319396]                                                                                                                                                                                                        
refresh_type                 on schedule                                                                                                                                                                                                                                                   
refresh_start_time           2024-06-22 21:03:40.418                                                                                                                                                                                                                                       
refresh_interval_second      300                                                                                                                                                                                                                                                           
unique_key_is_valid          true                                                                                                                                                                                                                                                          
unique_key_version_info      unique_key_version: 1, explode_sort_key_version: 1, digest: H4sIAAAAAAAAA3NMT9cx0nEP8g8NUHCKVDBScPb3CfX1C+ZSCE5OzANKBfmHx3u7Riq4Bfn7KqSWpeaVFMen5hVlJmekpnABAIf7bMY+AAAA, unique key infos:[sourceTable: 86:ql_ws.dbt_dev.events_enriched, uniqueKeyType: 1,] 
format                       PARQUET                                                                                                                                                                                                                                                       
format_options               (("cz.storage.parquet.block.size","134217728"),("cz.storage.parquet.dictionary.page.size","2097152"),("cz.storage.parquet.page.size","1048576"))                                                                                                              
statistics                   99 rows 4468 bytes

运行模型

Dynamic Table类型的dbt模型,通过构建时设置的周期刷新参数,由Lakehouse自动调度,不需要通过dbt run方式执行。

在目标环境完成模型构建后,您可以在Lakehouse平台通过以下SQL命令查看动态表的刷新状态:

show dynamic table refresh history where name ='product_grossing'

workspace_name schema_name name             virtual_cluster start_time          end_time            duration             state   refresh_trigger  suspended_reason refresh_mode error_message source_tables                                                             stats                                     completion_target job_id                   
-------------- ----------- ---------------- --------------- ------------------- ------------------- -------------------- ------- ---------------- ---------------- ------------ ------------- ------------------------------------------------------------------------- ----------------------------------------- ----------------- ------------------------ 
ql_ws          dbt_dev     product_grossing DEFAULT         2024-06-22 21:08:40 2024-06-22 21:08:41 0 00:00:00.566000000 SUCCEED SYSTEM_SCHEDULED (null)           INCREMENTAL  (null)        [{"schema":"dbt_dev","table_name":"events_enriched","workspace":"ql_ws"}] {"rows_deleted":"0","rows_inserted":"99"} (null)            202406222108406319689694

联系我们
预约咨询
微信咨询
电话咨询