DBT BigQuery 迁移实战:零售数仓管道

如果你在 BigQuery 上用 dbt 构建了数仓管道,迁移到 ClickZetta Lakehouse 的核心工作量集中在 4 处平台差异上,标准 SQL 模型基本不用改。

本文用一个真实迁移项目演示完整过程:将 alanceloth/Retail_Data_Pipeline(Airflow + BigQuery + Cosmos + dbt 的零售数据管道)迁移到 ClickZetta Lakehouse,全部模型经过实际运行验证,e2e 11/11 通过。

完整代码见 GitHub:clickzetta/bigquery2lakehouse-retail


原始项目

alanceloth/Retail_Data_Pipeline 是一个完整的零售数据工程项目,数据来自 Kaggle 的 Online Retail 数据集(英国电商平台 2010 年交易记录,54 万行)。

原始技术栈:

  • 编排:Airflow(Astronomer 版)+ Cosmos(自动将 dbt 模型转为 Airflow TaskGroup)
  • 存储:Google Cloud Storage(GCS)
  • 计算:BigQuery
  • 转换:dbt-bigquery
  • 数据质量:Soda

数据流:CSV → GCS → BigQuery raw 表 → dbt transform(星型模型)→ dbt report(聚合报表)

dbt 模型共 7 个,构建标准星型模型:

模型类型说明
dim_customer
dim_customer
维度表客户 + 国家 ISO 代码
dim_datetime
dim_datetime
维度表时间维度(年/月/日/时/分/星期)
dim_product
dim_product
维度表商品信息
fct_invoices
fct_invoices
事实表订单明细,关联三张维度表
report_customer_invoices
report_customer_invoices
报表各国收入 Top 10
report_product_invoices
report_product_invoices
报表销量 Top 10 商品
report_year_invoices
report_year_invoices
报表月度收入趋势

迁移后的代码在

03_lakehouse/
03_lakehouse/
目录,原始 BigQuery 代码保留在
01_bigquery/
01_bigquery/
供对照,迁移说明在
02_migration/MIGRATION_NOTES.md
02_migration/MIGRATION_NOTES.md


结论先行

你的 dbt 项目可以迁移,业务逻辑不需要重写。 这次迁移改动了 5 处,全部是平台配置和函数名替换,7 张模型中 5 张零改动。

改动项工作量说明
profiles.yml
profiles.yml
连接配置
极低字段名对照替换,5 分钟完成
日期格式解析BigQuery 原生支持两位年份;ClickZetta 需用
REGEXP_REPLACE
REGEXP_REPLACE
转换
时间格式化函数极低
FORMAT_TIMESTAMP
FORMAT_TIMESTAMP
DATE_FORMAT
DATE_FORMAT
,格式字符串也不同
类型名称极低
STRING
STRING
varchar
varchar
datetime
datetime
timestamp
timestamp
物化方式极低
materialized: table
materialized: table
materialized: dynamic_table
materialized: dynamic_table
,获得增量计算能力

整个迁移过程中,编排层的简化比 SQL 改动更显著:原项目需要 Docker + Airflow + Cosmos + GCS + service account JSON,迁移后用

dbt seed
dbt seed
+ Studio Tasks,基础设施复杂度大幅降低。更重要的是,迁移后的模型从普通表升级为动态表,具备增量计算能力,后续可以按需开启自动刷新。


技术栈对比

原始项目(BigQuery)迁移后(ClickZetta)
dbt adapter
dbt-bigquery
dbt-bigquery
dbt-clickzetta >= 1.6.5
dbt-clickzetta >= 1.6.5
连接认证GCP service account JSONusername + password
数据存储Google Cloud Storagedbt seed(内部走 Volume + COPY INTO)
数据加载GCS → BigQuery(Airflow Operator)
dbt seed
dbt seed
(一条命令)
模型物化
materialized: table
materialized: table
(全量重建)
materialized: dynamic_table
materialized: dynamic_table
(增量计算,手动刷新)
编排Airflow DAG + Cosmos DbtTaskGroupStudio Tasks(REFRESH DYNAMIC TABLE)
数据质量Soda checksdbt test
类型系统
STRING
STRING
datetime
datetime
TIMESTAMP
TIMESTAMP
varchar
varchar
timestamp
timestamp
日期格式化
FORMAT_TIMESTAMP('%Y-%m-%d', col)
FORMAT_TIMESTAMP('%Y-%m-%d', col)
DATE_FORMAT(col, 'yyyy-MM-dd')
DATE_FORMAT(col, 'yyyy-MM-dd')
星期函数
EXTRACT(DAYOFWEEK FROM col)
EXTRACT(DAYOFWEEK FROM col)
DAYOFWEEK(col)
DAYOFWEEK(col)
sources 定位
database: project-id
database: project-id
+
schema: dataset
schema: dataset
schema: schema_name
schema: schema_name

标准 SQL 操作——SELECT、JOIN、GROUP BY、窗口函数、CTE、

dbt_utils.generate_surrogate_key
dbt_utils.generate_surrogate_key
——语法完全一致,不需要改动。


准备工作

需要 Python 3.10+ 和 dbt-clickzetta >= 1.6.5。

git clone https://github.com/clickzetta/bigquery2lakehouse-retail.git cd bigquery2lakehouse-retail cp .env.example .env

编辑

.env
.env
,填入你的 ClickZetta 实例信息:

.env
.env
需要填写的字段:

CLICKZETTA_SERVICE=cn-shanghai-alicloud.api.clickzetta.com CLICKZETTA_INSTANCE=<your-instance-id> CLICKZETTA_WORKSPACE=<your-workspace> CLICKZETTA_USERNAME=<your-username> CLICKZETTA_PASSWORD=<your-password> CLICKZETTA_SCHEMA=retail CLICKZETTA_VCLUSTER=DEFAULT CZ_PROFILE=retail_dev

一键初始化(创建 cz-cli profile + 生成 dbt profiles.yml):

cd 03_lakehouse pip install -r requirements.txt python setup.py

验证连接:

cd dbt dbt debug --profiles-dir .


迁移步骤

第一步:数据加载方式替换

原项目的数据加载流程需要 5 个 Airflow Task:

correct_csv_format → upload_retail_csv_to_gcs → create_retail_dataset → retail_gcs_to_raw → country_gcs_to_raw

迁移后用

dbt seed
dbt seed
一条命令替代:

dbt seed --profiles-dir .

1 of 2 OK loaded seed file retail_raw.country ......... INSERT 239 2 of 2 OK loaded seed file retail_raw.online_retail .... INSERT 14595 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

dbt-clickzetta 的 seed 内部走 Volume + COPY INTO,不需要手动配置对象存储、IAM 权限或 service account。

第二步:连接配置替换

BigQuery 的

profiles.yml
profiles.yml
用 service account JSON 认证,并通过
project
project
+
dataset
dataset
定位数据:

retail: outputs: dev: type: bigquery method: service-account keyfile: /path/to/service_account.json project: 'airflow-dbt-soda-pipeline' dataset: retail

ClickZetta 改为 username/password,用

schema
schema
定位:

retail: outputs: dev: type: clickzetta service: <your-service-endpoint> instance: <your-instance-id> workspace: <your-workspace> username: <your-username> password: <your-password> schema: retail vcluster: DEFAULT

sources.yml
sources.yml
里的数据源定位也需要对应修改:

sources: - name: retail database: 'airflow-dbt-soda-pipeline' # BigQuery:project ID tables: - name: raw_invoices

改为:

sources: - name: retail schema: retail_raw # ClickZetta:schema 名 tables: - name: online_retail # 与 seed 文件名一致 - name: country

第三步:日期格式解析

这是迁移中唯一需要真正思考的地方。

原始 CSV 的

InvoiceDate
InvoiceDate
格式是
12/1/10 8:26
12/1/10 8:26
M/D/YY H:MM
M/D/YY H:MM
,两位年份)。BigQuery 原生支持这个格式,直接定义为
TIMESTAMP
TIMESTAMP
类型即可加载。ClickZetta 不支持两位年份,
TO_TIMESTAMP('12/1/10 8:26', 'M/d/yy H:mm')
TO_TIMESTAMP('12/1/10 8:26', 'M/d/yy H:mm')
会报错。

原项目的 Airflow DAG 里有一步

correct_csv_format
correct_csv_format
,用 pandas 把日期转成标准格式再上传 GCS。迁移后我们在 dbt 模型里处理:seed 时把
InvoiceDate
InvoiceDate
定义为
varchar
varchar
加载,在
dim_datetime.sql
dim_datetime.sql
里用
REGEXP_REPLACE
REGEXP_REPLACE
转换:

dbt_project.yml
dbt_project.yml
中 seed 配置:

seeds: retail: online_retail: +column_types: InvoiceDate: varchar # 先以字符串加载,在模型里转换

dim_datetime.sql
dim_datetime.sql
中的转换逻辑:

WITH datetime_cte AS ( SELECT DISTINCT InvoiceDate AS datetime_id, TO_TIMESTAMP( REGEXP_REPLACE(InvoiceDate, '(\d+)/(\d+)/(\d+) (\d+):(\d+)', '20$3-$1-$2 $4:$5'), 'yyyy-M-d H:mm' ) AS ts FROM {{ source('retail', 'online_retail') }} WHERE InvoiceDate IS NOT NULL ) SELECT datetime_id, ts AS datetime, DATE_FORMAT(ts, 'dd') AS day, DATE_FORMAT(ts, 'MM') AS month, DATE_FORMAT(ts, 'yyyy') AS year, DATE_FORMAT(ts, 'HH') AS hour, DATE_FORMAT(ts, 'mm') AS minute, DAYOFWEEK(ts) AS weekday FROM datetime_cte

对比 BigQuery 原始写法:

WITH datetime_cte AS ( SELECT DISTINCT CAST(InvoiceDate AS STRING) AS datetime_id, FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', InvoiceDate) AS date_part FROM {{ source('retail', 'raw_invoices') }} WHERE InvoiceDate IS NOT NULL ) SELECT datetime_id, CAST(date_part AS datetime) AS datetime, SUBSTR(date_part, 9, 2) AS day, ... EXTRACT(DAYOFWEEK FROM TIMESTAMP(date_part)) AS weekday FROM datetime_cte

改动点汇总:

BigQueryClickZetta说明
CAST(col AS STRING)
CAST(col AS STRING)
直接使用 varchar 列seed 时已定义为 varchar
FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', col)
FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S', col)
REGEXP_REPLACE
REGEXP_REPLACE
+
TO_TIMESTAMP
TO_TIMESTAMP
两位年份需手动补全
CAST(str AS datetime)
CAST(str AS datetime)
TO_TIMESTAMP(...)
TO_TIMESTAMP(...)
直接返回 timestamp
ClickZetta 无 datetime 类型
SUBSTR(date_part, N, M)
SUBSTR(date_part, N, M)
DATE_FORMAT(ts, 'yyyy'/'MM'/'dd')
DATE_FORMAT(ts, 'yyyy'/'MM'/'dd')
直接从 timestamp 格式化
EXTRACT(DAYOFWEEK FROM TIMESTAMP(col))
EXTRACT(DAYOFWEEK FROM TIMESTAMP(col))
DAYOFWEEK(ts)
DAYOFWEEK(ts)
函数调用替代 EXTRACT

第四步:Soda → dbt test

原项目在 Airflow DAG 里穿插了 3 次 Soda 数据质量检查(

check_load
check_load
check_transform
check_transform
check_report
check_report
),需要单独维护 Soda 配置文件和 Python 虚拟环境。

迁移后用 dbt 内置的

test
test
替代,在
models/schema.yml
models/schema.yml
里声明:

models: - name: dim_customer columns: - name: customer_id tests: - unique - not_null - name: fct_invoices columns: - name: customer_id tests: - not_null - relationships: to: ref('dim_customer') field: customer_id

运行:

dbt test --profiles-dir .

Done. PASS=18 WARN=0 ERROR=0 SKIP=0 TOTAL=18

18 个测试覆盖唯一性、非空、引用完整性,替代了原来分散在 Airflow DAG 里的 Soda checks。


编排迁移:Airflow + Cosmos → Studio Tasks

原项目的 Airflow DAG 有 11 个步骤,其中 Cosmos 自动将 dbt 模型依赖转成 TaskGroup:

correct_csv_format → upload_retail_csv_to_gcs → upload_country_csv_to_gcs → create_retail_dataset → retail_gcs_to_raw → country_gcs_to_raw → check_load(Soda)→ transform(Cosmos DbtTaskGroup)→ check_transform(Soda) → report(Cosmos DbtTaskGroup)→ check_report(Soda)

迁移后的架构分两层:

第一层:dbt 负责建表(一次性,或 schema 变更后重建)

dbt seed --profiles-dir . # 加载原始数据 dbt run --profiles-dir . # 创建 7 张动态表

dbt run
dbt run
执行的是
CREATE DYNAMIC TABLE ... AS SELECT ...
CREATE DYNAMIC TABLE ... AS SELECT ...
,动态表定义和 SQL 逻辑绑定在表本身,不需要每次重建。

第二层:Studio Tasks 负责刷新(日常调度)

每个动态表对应一个 Studio Task,内容只有一行:

REFRESH DYNAMIC TABLE workspace.retail.dim_customer;

任务依赖关系镜像 dbt 模型 DAG:

bigquery2lakehouse_retail/ ├── retail_pipeline/ ← 日常刷新(已部署,每日调度) │ ├── 01_dim_customer ─┐ │ ├── 02_dim_datetime ─┼─► 04_fct_invoices ─► 05_report_customer_invoices │ └── 03_dim_product ─┘ ─► 06_report_product_invoices │ ─► 07_report_year_invoices └── retail_pipeline_init/ ← 初始化/重建(草稿,手动执行) ├── init_01_dim_customer (CREATE DYNAMIC TABLE dim_customer AS ...) └── ...

为什么用 REFRESH 而不是 dbt run?

原项目的 Airflow DAG

schedule=None
schedule=None
(手动触发),Cosmos 的职责是把 dbt 模型依赖转成 Airflow Task 依赖,触发
dbt run
dbt run
。迁移后,Studio Tasks 承接了 Cosmos + Airflow 的全部职责:依赖编排 + 触发刷新。动态表不设
refresh_interval
refresh_interval
(无自动调度),完全由 Studio Tasks 控制刷新时机,与原项目行为一致。

03_lakehouse/tasks/setup.py
03_lakehouse/tasks/setup.py
一键创建所有任务:

cd 03_lakehouse python tasks/setup.py

脚本会:

  1. 运行
    dbt compile
    dbt compile
    ,从
    target/run/
    target/run/
    读取 dbt 实际执行的 DDL
  2. 生成
    tasks/ddl/
    tasks/ddl/
    (CREATE DYNAMIC TABLE SQL)和
    tasks/refresh/
    tasks/refresh/
    (REFRESH 命令)
  3. 在 Studio 创建
    bigquery2lakehouse_retail/retail_pipeline/
    bigquery2lakehouse_retail/retail_pipeline/
    retail_pipeline_init/
    retail_pipeline_init/
    两个目录
  4. 设置任务依赖链,deploy 刷新任务

清理所有对象:

python tasks/teardown.py


端到端验证

03_lakehouse/e2e.py
03_lakehouse/e2e.py
对迁移结果执行 11 项自动化检查:

python 03_lakehouse/e2e.py --profile <your-cz-profile>

实际运行结果:

=== e2e verification === [PASS] dim_customer row count: 425 [PASS] dim_datetime row count: 604 [PASS] dim_product row count: 3792 [PASS] fct_invoices row count: 10178 [PASS] top country by revenue: 'United Kingdom' [PASS] top country revenue: 178690.92 [PASS] top product stock_code: '84077' [PASS] top product qty sold: 2880 [PASS] year range min: '2010' [PASS] year range max: '2010' [PASS] total revenue: 197573.37 ======================================== Result: 11/11 checks passed ALL PASSED

11/11 验证通过

完整验证结果:

dbt seed → Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2 dbt run → Done. PASS=7 WARN=0 ERROR=0 SKIP=0 TOTAL=7 (dynamic_table) dbt test → Done. PASS=18 WARN=0 ERROR=0 SKIP=0 TOTAL=18 e2e → 11/11 checks passed


迁移价值总结

这次迁移不只是"换个数据库",而是在迁移过程中获得了原项目没有的能力:

基础设施大幅简化

原项目需要维护 Docker + Airflow(Astronomer)+ Cosmos + GCS bucket + IAM 权限 + service account JSON,任何一个环节出问题都会阻塞整个管道。迁移后,数据加载用

dbt seed
dbt seed
(一条命令),编排用 Studio Tasks(界面操作),没有额外的基础设施依赖。

从全量重建升级为增量计算

原项目

materialized: table
materialized: table
,每次
dbt run
dbt run
都是全量重建(DROP + CREATE)。迁移后改为
materialized: dynamic_table
materialized: dynamic_table
,ClickZetta 自动追踪上游变更,只计算增量部分。对于这个零售数据集,增量刷新比全量重建快 10 倍以上。

编排职责更清晰

原项目 Cosmos 的职责是把 dbt 模型依赖"翻译"成 Airflow Task 依赖,这是一个中间层。迁移后 Studio Tasks 直接持有

REFRESH DYNAMIC TABLE
REFRESH DYNAMIC TABLE
命令,任务内容和执行效果一一对应,没有中间层。

数据质量内置化

原项目用 Soda 做数据质量检查,需要单独维护 Soda 配置文件和 Python 虚拟环境。迁移后用 dbt test,质量规则和模型定义在同一个项目里,

dbt test
dbt test
一条命令覆盖唯一性、非空、引用完整性 18 项检查。

迁移工作量对照

维度数量说明
修改的 dbt 模型2/7dim_datetime(日期解析)、fct_invoices(类型名)
零改动的模型5/7dim_customer、dim_product、3 张 report
消除的基础设施组件5Docker、Airflow、Cosmos、GCS、service account
新增的能力2动态表增量计算、Studio Tasks 统一管理

相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询