Pipe

Pipe 是 Lakehouse 的持续数据导入对象,通过 SQL DDL 创建后自动运行,持续从对象存储(OSS/COS/S3)或 Kafka 读取数据并写入目标表。

详细介绍参见 Pipe 对象模型


本章内容

页面说明
CREATE PIPE创建对象存储 Pipe 或 Kafka Pipe
ALTER PIPE暂停、恢复 Pipe,或修改批处理间隔等属性
DROP PIPE删除 Pipe(不影响目标表数据)
SHOW PIPES列出当前 Schema 下的所有 Pipe
SHOW CREATE PIPE查看 Pipe 的创建语句
DESC PIPE查看 Pipe 详情,包括状态、来源、目标和延迟

常用操作

创建对象存储 Pipe

-- LIST_PURGE 模式:定期轮询,导入后删除源文件 CREATE PIPE orders_pipe VIRTUAL_CLUSTER = 'DEFAULT' INGEST_MODE = 'LIST_PURGE' AS COPY INTO orders FROM VOLUME orders_vol USING CSV OPTIONS('header' = 'true');

创建 Kafka Pipe

CREATE PIPE kafka_orders_pipe VIRTUAL_CLUSTER = 'DEFAULT' BATCH_INTERVAL_IN_SECONDS = '60' AS COPY INTO orders_raw FROM ( SELECT CAST(value AS STRING) AS raw_msg FROM TABLE(READ_KAFKA( 'kafka-host:9092', 'orders_topic', '', 'pipe_orders_group', '', '', '', '', 'raw', 'raw', 0, map() )) );

暂停与恢复

-- 暂停 ALTER PIPE orders_pipe SET PIPE_EXECUTION_PAUSED = TRUE; -- 恢复 ALTER PIPE orders_pipe SET PIPE_EXECUTION_PAUSED = FALSE; -- 立即触发一次扫描 ALTER PIPE orders_pipe REFRESH;

查看与删除

-- 查看所有 Pipe SHOW PIPES; -- 查看 Pipe 详情 DESC PIPE orders_pipe; -- 删除 Pipe DROP PIPE orders_pipe;


相关文档

文档说明
Pipe 对象模型核心概念、两种模式对比、去重机制、完整参数说明
对象存储 Pipe 详细配置EVENT_NOTIFICATION 模式完整配置
Kafka Pipe 详细配置READ_KAFKA 参数详解、消费位点管理
实时数据管道选型指南Pipe / Stream / 动态表 选型对比
联系我们
预约咨询
微信咨询
电话咨询
邮件咨询