READ_KAFKA

功能描述

READ_KAFKA 函数用于从 Apache Kafka 集群一次性读取数据并以表格形式返回。该函数主要用于数据探查、测试和临时查询场景,直接SELECT查询不会在 Kafka 中创建持久的消费者组。

语法

SELECT ... FROM READ_KAFKA(
 'bootstrap',
 'topic',
'topic_pattern',
 'group_id', 
'STARTING_OFFSETS', 'ENDING_OFFSETS', 'STARTING_OFFSETS_TIMESTAMP', 'ENDING_OFFSETS_TIMESTAMP', 
'KEY_FORMAT', 
'VALUE_FORMAT', 
   0,
MAP()      
)

read_kafka 函数用于从Kafka读取数据。它支持以下参数:

  • bootstrap: Kafka服务器地址,如 1.2.3.1:9092,1.2.3.2:9092
  • topic: Kafka主题名称,多个主题用逗号分隔,如 topicA,topicB
  • topic_patternt*:topic正则,暂不支持,默认留空。如:''。*
  • group_id: Kafka消费者组ID。临时消费者组 ID,仅用于函数执行期间不会在 Kafka 中创建持久消费者组
  • STARTING_OFFSETS: 指定读取的起始点位,默认为 earliest,在读取时推荐填写。
  • ENDING_OFFSETS: 指定结束点位,默认为 latest,在读取时推荐填写。
  • STARTING_OFFSETS_TIMESTAMP: 指定起始点位的时间戳。
  • ENDING_OFFSETS_TIMESTAMP: 指定结束点位的时间戳。
  • KEY_FORMAT:指定读取key的格式,类型是STRING类型忽略大小写。目前只支持raw格式
  • VALUE_FORMAT:指定读取value的格式,类型是STRING类型忽略大小写。目前只支持raw格式
  • MAX_ERROR_NUMBER:读取窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行,取值范围0-100000
  • MAP():需要传入到Kafka的参数,以kafka.开头,直接使用kafka的参数即可,可以在Kafka中找到这种选项。,格式如MAP('kafka.security.protocol', 'PLAINTEXT'),取值参考,kafka文档

read_kafka 结果返回值:

字段含义类型
topicKafka主题名称STRING
partition数据分区IDINT
offsetKafka分区中的偏移量BIGINT
timestampKafka消息时间戳TIMESTAMP_LTZ
timestamp_typeKafka消息时间戳类型STRING
headersKafka消息头MAP<STRING, BINARY>
keyKafka的key值BINARY
valueKafka的value值BINARY

注意事项

  • 使用read_kafka时请确保和Lakehouse网络打通

具体案例

-- 读取指定时间范围内的数据
SELECT 
    topic, 
    partition, 
    offset, 
    timestamp,
    CAST(key AS STRING) AS key_str,
    CAST(value AS STRING) AS value_str
FROM READ_KAFKA(
    'kafka-broker:9092',
    'order_events',
    '',
    'temp_analysis_group',
    'earliest',                               -- 从最早开始
    'latest',                                 -- 读取到最新
    '1640995200000',                          -- 2022-01-01 00:00:00
    '1641081600000',                          -- 2022-01-02 00:00:00
    'raw',
    'raw',
     0,
MAP() 

)
LIMIT10;

联系我们
预约咨询
微信咨询
电话咨询