使用Pipe持续导入数据
概述
Pipe 管道是云器 Lakehouse 用于流式数据采集的对象类型。使用 Pipe 可以持续增量地采集流式数据(如来自 Kafka),简化流式数据导入流程。
Pipe 管道在定义时使用 COPY 命令表达读取外部数据源并写入目标表的语义。与单独的 COPY 命令的主要区别在于,Pipe 将自动持续调度 COPY 任务,维护和管理数据源的读取位置,持续不断地从数据源增量导入数据。
使用 Pipe 进行流式数据自动导入的总体逻辑示意如下:
)
数据源支持
Pipe 管道提供了对数据源新变化数据的持续采集能力,当前数据源包括:
- Kafka数据源
- 对象存储(如阿里云OSS)
管理使用Pipe
Lakehouse 提供了一组操作命令用于管理 Pipe 管道:
CREATE PIPEDESC PIPESHOW PIPESDROP PIPE
创建Pipe对象
创建Pipe对象读取Kafka数据源
可使用SQL命令创建Pipe对象,语法如下:
参数说明:
-
VIRTUAL_CLUSTER:Pipe 提交 COPY 作业所用的 VC。必填项。 -
BATCH_INTERVAL_IN_SECONDS:作业生成周期。可选参数,默认 60 秒。 -
BATCH_SIZE_PER_KAFKA_PARTITION:作业最大单分区消息数。可选参数,默认 50 万。 -
COMMENT:添加注释。可选参数。 -
COPY_STATEMENT:使用COPY INTO<table> 将数据导入到目标表。- 对于 Kafka 数据源,在 SELECT 语句中将使用
read_kafka表值函数读取 Kafka 消息数据。函数的参数说明请查看read_kafka函数说明。
- 对于 Kafka 数据源,在 SELECT 语句中将使用
创建Pipe读取对象存储数据源
读取对象存储需要创建 VOLUME,同时需要开通阿里云消息服务。消息服务用于向 Pipe 发送对象存储事件,以触发 Pipe 执行。
参数说明:
- VIRTUAL_CLUSTER:Pipe 提交 COPY 作业所用的 VC。必填项。
ALICLOUD_MNS_QUEUE:消息队列名称。消息服务 MNS 可以将对象存储指定资源上产生的事件以消息的方式主动推送到指定的接收端。Pipe 接收到消息队列事件后触发执行,获取哪些文件是新增文件,然后触发COPY命令将数据导入到表中。
查看Pipe列表及对象详情
当前您可以使用 SQL 命令查看 Pipe 列表及对象详情。
- 使用
SHOW PIPES命令查看 Pipe 对象列表
说明
-
SHOW PIPES:默认列出当前 Schema 下的 Pipe 对象。 -
SHOW PIPES IN SCHEMA schema_name:列出指定 Schema 中的所有 Pipe 对象。 -
SHOW PIPES IN WORKSPACE workspace_name:列出指定 Workspace 中的所有 Pipe 对象。 -
使用
DESC PIPE命令查看指定 Pipe 对象详细信息
删除Pipe对象
您可以使用 SQL 命令删除 Pipe 对象。
约束与限制
- 数据源是 Kafka 时:一个 Pipe 中只能有一个
read_kafka函数。 - 数据源是对象存储时:一个 Pipe 中只能有一个 Volume 对象。
