DBT ClickZetta adapter 使用指南


安装

需要 Python 3.10+(推荐 3.12)和 dbt-core 1.8+。

pip install "dbt-clickzetta>=1.7.8"

连接配置

profiles.yml
profiles.yml
里配置连接信息:

my_project: target: dev outputs: dev: type: clickzetta service: cn-shanghai-alicloud.api.clickzetta.com instance: your_instance workspace: your_workspace username: your_username password: your_password schema: your_schema vcluster: DEFAULT

参数必填说明
type
type
固定为
clickzetta
clickzetta
service
service
API 地址,如
cn-shanghai-alicloud.api.clickzetta.com
cn-shanghai-alicloud.api.clickzetta.com
instance
instance
实例名称
workspace
workspace
工作空间名称
username
username
用户名
password
password
密码
schema
schema
默认 Schema 名称
vcluster
vcluster
计算集群名称,如
DEFAULT
DEFAULT
connect_retries
connect_retries
连接重试次数,默认 3

验证连接:

dbt debug


支持的功能

功能支持情况
table
table
materialization
view
view
materialization
incremental
incremental
materialization
ephemeral
ephemeral
materialization
snapshot
snapshot
(SCD Type 2)
dynamic_table
dynamic_table
materialization
materialized_view
materialized_view
materialization
dbt test
dbt test
(generic + singular)
dbt seed
dbt seed
dbt docs generate
dbt docs generate
✅(含行数、大小、最后修改时间)
dbt source freshness
dbt source freshness
persist_docs
persist_docs
(relation + columns)
分区表
分桶表
Python models❌ 不支持,仅支持 SQL 模型
on_schema_change
on_schema_change
✅(append_new_columns、sync_all_columns)
grants
grants
clone
clone
materialization
✅(零拷贝克隆 + Time Travel 克隆)
索引(Bloomfilter / 倒排 / 向量)✅(通过
indexes
indexes
config 自动创建)
Table Stream as source✅(在
sources.yml
sources.yml
声明,
source()
source()
引用)
VCluster per-model✅(通过
vcluster
vcluster
config)

增量策略

支持 4 种增量策略:

策略说明
merge
merge
(默认)
MERGE INTO,需要
unique_key
unique_key
append
append
INSERT INTO,不去重
insert_overwrite
insert_overwrite
INSERT OVERWRITE,动态分区模式
delete+insert
delete+insert
先按
unique_key
unique_key
删除匹配行再插入(
unique_key
unique_key
必填)

{{ config( materialized='incremental', incremental_strategy='merge', unique_key='id' ) }} select * from {{ ref('stg_orders') }} {% if is_incremental() %} where updated_at >= (select max(updated_at) from {{ this }}) {% endif %}


Dynamic Table

Dynamic Table 按

refresh_interval
refresh_interval
自动增量刷新,无需外部调度:

{{ config( materialized='dynamic_table', refresh_interval='5 MINUTE', refresh_vc='DEFAULT' ) }} select customer_id, count(order_id) as order_count, sum(amount) as total_amount from {{ ref('stg_orders') }} group by customer_id

手动触发立即刷新:

dbt run-operation refresh_dynamic_table --args '{model_name: my_dynamic_table}'


索引

建表时自动创建索引,支持 Bloomfilter(等值查询)、倒排(全文搜索)、向量(相似度搜索)三种类型:

{{ config( materialized='table', indexes=[ {'type': 'bloomfilter', 'columns': ['order_id']}, {'type': 'inverted', 'columns': ['status'], 'analyzer': 'unicode'}, {'type': 'vector', 'columns': ['embedding'], 'distance_function': 'cosine_distance', 'scalar_type': 'f32'} ] ) }}


VCluster per-model

为单个模型指定计算集群,实现大小模型资源隔离:

{{ config( materialized='incremental', incremental_strategy='delete+insert', unique_key='order_id', vcluster='large_ap' ) }}

也可以在

dbt_project.yml
dbt_project.yml
里按目录批量配置:

models: my_project: marts: +vcluster: large_ap staging: +vcluster: DEFAULT_AP


Table Stream as source

sources.yml
sources.yml
里声明 Table Stream,在模型里用
source()
source()
引用,消费 CDC 变更数据:

sources: - name: my_streams schema: my_schema tables: - name: orders_stream

{{ config(materialized='incremental', incremental_strategy='append') }} select `__change_type` as cdc_change_type, `__commit_timestamp` as cdc_commit_ts, order_id, customer_id, amount from {{ source('my_streams', 'orders_stream') }}


Clone

零拷贝克隆,适合 CI/CD 环境隔离或快速创建测试副本:

{{ config( materialized='clone', source='my_schema.fct_orders' ) }}

Time Travel 克隆(恢复到历史时间点):

{{ config( materialized='clone', source='my_schema.fct_orders', at_timestamp="current_timestamp() - interval 1 hours" ) }}


Snapshot(SCD Type 2)

通过 MERGE INTO 实现 SCD Type 2,无需 Delta/Iceberg:

{% snapshot orders_snapshot %} {{ config( target_schema='snapshots', unique_key='order_id', strategy='timestamp', updated_at='updated_at' ) }} select * from {{ source('raw', 'orders') }} {% endsnapshot %}


Utility Macros

通过

dbt run-operation
dbt run-operation
调用内置 macro:

# 压缩小文件(高频增量写入后使用) dbt run-operation optimize_table --args '{relation: my_schema.my_table}' dbt run-operation optimize_table --args '{relation: my_schema.my_table, where: "dt >= current_date() - interval 7 days"}' # 切换当前 session 的 VCluster dbt run-operation use_vcluster --args '{vcluster: large_ap}' # 查看可恢复的已删除对象 dbt run-operation show_tables_history --args '{schema: my_schema}' # 恢复已删除对象(table / dynamic_table / materialized_view / stream) dbt run-operation undrop --args '{relation: my_schema.my_table}' # 删除对象 dbt run-operation drop_object --args '{relation: my_schema.my_table, type: table}' # 手动刷新 Dynamic Table dbt run-operation refresh_dynamic_table --args '{model_name: my_dynamic_table}'


相关文档

联系我们
预约咨询
微信咨询
电话咨询
邮件咨询