DBT ClickZetta adapter 使用指南
💡 如果你使用 dbt 的目的是在云器 Lakehouse 上做数据转换和建模 ,云器 Studio 提供了原生的开发、编排和运维能力,无需维护独立的 dbt 项目:
第一步:在 Studio SQL 任务中开发转换逻辑
Studio SQL 任务 是云器原生的数据建模方式,支持在 IDE 中编写 SQL 并调度执行。根据转换场景选择合适的对象类型:
场景 对象类型 说明 需要自动增量刷新的转换逻辑 Dynamic Table(动态表) 声明式定义转换 SQL,系统自动增量计算,无需手写调度逻辑 高频固定聚合查询加速 物化视图 预计算结果物理存储,BI 查询直接命中 普通 SQL ETL 逻辑 普通表 + INSERT/MERGE 标准 SQL 写入,灵活控制执行时机
第二步:将多个 SQL 任务编排为工作流
多个 SQL 任务可以组合成 工作流(DAG) ,定义任务间的依赖关系,统一调度触发。
第三步:调度、监控和运维
Studio 内置调度系统,支持 Cron 定时触发、任务依赖、失败告警、运行日志查看和失败重跑,详见 深入使用 Studio 。
如果你已有 dbt 项目需要迁移,或团队习惯 dbt 的开发方式,继续往下看接入指南。
安装
需要 Python 3.10+(推荐 3.12)和 dbt-core 1.8+。
pip install "dbt-clickzetta>=1.7.8"
连接配置
在
profiles.ymlprofiles.yml
里配置连接信息:
my_project:
target: dev
outputs:
dev:
type: clickzetta
service: cn-shanghai-alicloud.api.clickzetta.com
instance: your_instance
workspace: your_workspace
username: your_username
password: your_password
schema: your_schema
vcluster: DEFAULT
参数 必填 说明 typetype
是 固定为 clickzettaclickzetta
serviceservice
是 API 地址,如 cn-shanghai-alicloud.api.clickzetta.comcn-shanghai-alicloud.api.clickzetta.com
instanceinstance
是 实例名称 workspaceworkspace
是 工作空间名称 usernameusername
是 用户名 passwordpassword
是 密码 schemaschema
是 默认 Schema 名称 vclustervcluster
是 计算集群名称,如 DEFAULTDEFAULT
connect_retriesconnect_retries
否 连接重试次数,默认 3
验证连接:
dbt debug
支持的功能
功能 支持情况 tabletable
materialization✅ viewview
materialization✅ incrementalincremental
materialization✅ ephemeralephemeral
materialization✅ snapshotsnapshot
(SCD Type 2)✅ dynamic_tabledynamic_table
materialization✅ materialized_viewmaterialized_view
materialization✅ dbt testdbt test
(generic + singular)✅ dbt seeddbt seed
✅ dbt docs generatedbt docs generate
✅(含行数、大小、最后修改时间) dbt source freshnessdbt source freshness
✅ persist_docspersist_docs
(relation + columns)✅ 分区表 ✅ 分桶表 ✅ Python models ❌ 不支持,仅支持 SQL 模型 on_schema_changeon_schema_change
✅(append_new_columns、sync_all_columns) grantsgrants
✅ cloneclone
materialization✅(零拷贝克隆 + Time Travel 克隆) 索引(Bloomfilter / 倒排 / 向量) ✅(通过 indexesindexes
config 自动创建) Table Stream as source ✅(在 sources.ymlsources.yml
声明,source()source()
引用) VCluster per-model ✅(通过 vclustervcluster
config)
增量策略
支持 4 种增量策略:
策略 说明 mergemerge
(默认)MERGE INTO,需要 unique_keyunique_key
appendappend
INSERT INTO,不去重 insert_overwriteinsert_overwrite
INSERT OVERWRITE,动态分区模式 delete+insertdelete+insert
先按 unique_keyunique_key
删除匹配行再插入(unique_keyunique_key
必填)
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='id'
) }}
select * from {{ ref('stg_orders') }}
{% if is_incremental() %}
where updated_at >= (select max(updated_at) from {{ this }})
{% endif %}
Dynamic Table
Dynamic Table 按
refresh_intervalrefresh_interval
自动增量刷新,无需外部调度:
{{ config(
materialized='dynamic_table',
refresh_interval='5 MINUTE',
refresh_vc='DEFAULT'
) }}
select
customer_id,
count(order_id) as order_count,
sum(amount) as total_amount
from {{ ref('stg_orders') }}
group by customer_id
手动触发立即刷新:
dbt run-operation refresh_dynamic_table --args '{model_name: my_dynamic_table}'
索引
建表时自动创建索引,支持 Bloomfilter(等值查询)、倒排(全文搜索)、向量(相似度搜索)三种类型:
{{ config(
materialized='table',
indexes=[
{'type': 'bloomfilter', 'columns': ['order_id']},
{'type': 'inverted', 'columns': ['status'], 'analyzer': 'unicode'},
{'type': 'vector', 'columns': ['embedding'],
'distance_function': 'cosine_distance', 'scalar_type': 'f32'}
]
) }}
VCluster per-model
为单个模型指定计算集群,实现大小模型资源隔离:
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='order_id',
vcluster='large_ap'
) }}
也可以在
dbt_project.ymldbt_project.yml
里按目录批量配置:
models:
my_project:
marts:
+vcluster: large_ap
staging:
+vcluster: DEFAULT_AP
Table Stream as source
在
sources.ymlsources.yml
里声明 Table Stream,在模型里用
source()source()
引用,消费 CDC 变更数据:
sources:
- name: my_streams
schema: my_schema
tables:
- name: orders_stream
{{ config(materialized='incremental', incremental_strategy='append') }}
select
`__change_type` as cdc_change_type,
`__commit_timestamp` as cdc_commit_ts,
order_id, customer_id, amount
from {{ source('my_streams', 'orders_stream') }}
⚠️ 注意 :
__change_type__change_type
、
__commit_timestamp__commit_timestamp
、
__commit_version__commit_version
是 Table Stream 系统列,引用时需要用 backtick 包裹。消费时推荐用
SELECT * EXCEPT(__change_type, __commit_timestamp, __commit_version)SELECT * EXCEPT(__change_type, __commit_timestamp, __commit_version)
过滤系统列。
Clone
零拷贝克隆,适合 CI/CD 环境隔离或快速创建测试副本:
{{ config(
materialized='clone',
source='my_schema.fct_orders'
) }}
Time Travel 克隆(恢复到历史时间点):
{{ config(
materialized='clone',
source='my_schema.fct_orders',
at_timestamp="current_timestamp() - interval 1 hours"
) }}
Snapshot(SCD Type 2)
通过 MERGE INTO 实现 SCD Type 2,无需 Delta/Iceberg:
{% snapshot orders_snapshot %}
{{ config(
target_schema='snapshots',
unique_key='order_id',
strategy='timestamp',
updated_at='updated_at'
) }}
select * from {{ source('raw', 'orders') }}
{% endsnapshot %}
Utility Macros
通过
dbt run-operationdbt run-operation
调用内置 macro:
# 压缩小文件(高频增量写入后使用)
dbt run-operation optimize_table --args '{relation: my_schema.my_table}'
dbt run-operation optimize_table --args '{relation: my_schema.my_table, where: "dt >= current_date() - interval 7 days"}'
# 切换当前 session 的 VCluster
dbt run-operation use_vcluster --args '{vcluster: large_ap}'
# 查看可恢复的已删除对象
dbt run-operation show_tables_history --args '{schema: my_schema}'
# 恢复已删除对象(table / dynamic_table / materialized_view / stream)
dbt run-operation undrop --args '{relation: my_schema.my_table}'
# 删除对象
dbt run-operation drop_object --args '{relation: my_schema.my_table, type: table}'
# 手动刷新 Dynamic Table
dbt run-operation refresh_dynamic_table --args '{model_name: my_dynamic_table}'
相关文档