使用Pipe持续导入数据

概述

Pipe管道是云器Lakehouse用于流式数据采集的对象类型。使用Pipe可以持续增量地采集流式数据(如来自于Kafka),简化流式数据导入流程。

Pipe管道在定义时使用 COPY 命令表达读取外部数据源并写入目标表的语义,与单独的COPY命令的主要区别在于Pipe将自动持续调度COPY任务,维护和管理数据源的读取位置,持续不断从数据源增量导入数据。

使用Pipe进行流式数据自动导入的总体逻辑示意如下:

数据源支持

Pipe管道提供了对数据源新变化数据的持续采集能力,当前数据源包括:

  • Kafka数据源
  • 对象存储阿里云oss

管理使用Pipe

Lakehouse提供了一组操作命令用于管理Pipe管道:

  • CREATE PIPE
  • DESC PIPE
  • SHOW PIPES
  • DROP PIPE

创建Pipe对象

创建PIpe对象读取Kafka数据源

可使用SQL命令创建Pipe对象,语法如下:

CREATE PIPE [ IF NOT EXISTS ] <name>
  VIRTUAL_CLUSTER = '<virtual_cluster_name>'
  [ BATCH_INTERVAL_IN_SECONDS = '<number>' ]
  [ BATCH_SIZE_PER_KAFKA_PARTITION = '<number>' ]
  [ COMMENT '<string_literal>' ]
  AS <copy_statement>

参数说明:

  • VIRTUAL_CLUSTER:pipe 提交 copy 作业所用 VC。必填项。

  • BATCH_INTERVAL_IN_SECONDS:作业生成周期。可选参数,默认 60 秒。

  • BATCH_SIZE_PER_KAFKA_PARTITION:作业最大单分区消息数,可选参数,默认 50 万。

  • COMMENT:添加注释。可选参数。

  • COPY_STATEMENT:使用COPY INTO <table>用来导入数据到目标表。

    • 对于Kafka数据源,在SELECT语句中将使用read_kafka表值函数读取Kafka消息数据。函数的参数说明请查看read_kafka函数说明。

创建Pipe读取对象存储数据源

读取对象存储需要创建VOLUME同时需要开通阿里云消息服务。消息服务用来向Pipe发送对象存储事件触发Pipe执行

--创建pipe,其中ALICLOUD_MNS_QUEUE使用,刚刚创建的云消息服务mns队列lakehouse-oss-event-queue
CREATE PIPE [ IF NOT EXISTS ] <name>
  VIRTUAL_CLUSTER = '<virtual_cluster_name>'
ALICLOUD_MNS_QUEUE = 'MNSQUEUE'
as
AS <copy_statement>

参数说明:

  • VIRTUAL_CLUSTER:pipe 提交 copy 作业所用 VC。必填项。
  • ALICLOUD_MNS_QUEUE:消息队列名称。消息服务MNS可以将对象存储指定资源上产生的事件以消息的方式主动推送到指定的接收端。Pipe接受到消息队列事件触发执行获取哪些文件是新增文件,然后触发COPY命令导入到表中

查看Pipe列表及对象详情

当前您可以使用SQL命令查看Pipe列表及对象详情。

  • 使用SHOW PIPES命令查看PIPE对象列表
SHOW PIPES;
  • 使用DESC PIPE命令查看指定PIPE对象详细信息
DESC PIPE <name>;

删除Pipe对象

当前您可以使用SQL命令删除Pipe对象。

DROP PIPE <name>;

约束与限制

  • 数据源是kafka时:一个pipe中只能有一个read_kafka函数
  • 数据源是对象存储时:一个pipe中只能有一个volume对象

联系我们
预约咨询
微信咨询
电话咨询