将数据导入云器Lakehouse的完整指南

数据入仓:通过Zettapark以SQL INSERT方式加载数据

概述

云器Lakehouse提供了和PySpark兼容的的Zettapark,在流行的IDE(比如VS Code)里通过Python和SQL编程的方式,将数据加载到云器Lakehouse的表里。

使用场景

这种方式可以方便地运行 SQL 并上传文件,加载数据的一种方法是对每条记录执行 SQL INSERT 语句,适合在Python编程环境里少量数据的上传。虽然这是一种方便的数据插入方式,但是对大批量数据的加载效率不高,因为云器Lakehouse并不是一个传统的数据库,而是针对写入大批量数据进行了更多优化。

实现步骤

在电脑上打开VS Code,创建一个名为 py_zettapark_sql_insert.py 的文件,并将以下代码复制到py_zettapark_sql_insert.py文件中。

import json,gzip
from clickzetta.zettapark.session import Session
from datetime import datetime

# 从配置文件中读取参数
with open('config-ingest.json', 'r') as config_file:
    config = json.load(config_file)

print("正在连接到云器Lakehouse.....\n")

# 创建会话
session = Session.builder.configs(config).create()

print("连接成功!...\n")

target_table_name = "lift_tuckets_import_by_py_insert"

create_target_table_query = f"""
CREATE TABLE if not exists ql_ws.ingest.{target_table_name}(
  `txid` string,
  `rfid` string,
  `resort` string,
  `purchase_time` timestamp_ltz,
  `expiration_time` date,
  `days` int,
  `name` string,
  `address_street` string,
  `address_city` string,
  `address_state` string,
  `address_postalcode` string,
  `phone` string,
  `email` string,
  `emergency_contact_name` string,
  `emergency_contact_phone` string)
"""
session.sql(create_target_table_query).collect()

def save_to_clickzetta(session, message):
    record = json.loads(message)
    print('inserting record to Clickzetta Lakehouse')

    # 转换日期和时间字段
    purchase_time = datetime.strptime(record['purchase_time'], '%Y-%m-%d %H:%M:%S')
    expiration_time = datetime.strptime(record['expiration_time'], '%Y-%m-%d').date()

    row = (
        f"'{record['txid']}'", f"'{record['rfid']}'", f"'{record['resort']}'", 
        f"timestamp_ltz '{record['purchase_time']}'", f"date '{record['expiration_time']}'", 
        record['days'], f"'{record['name']}'", f"'{record['address_street']}'", 
        f"'{record['address_city']}'", f"'{record['address_state']}'", 
        record['address_postalcode'], record['phone'], 
        f"'{record['email']}'", f"'{record['emergency_contact_name']}'", 
        record['emergency_contact_phone']
    )

    sql_query = f"""
    INSERT INTO ql_ws.ingest.{target_table_name} 
    (TXID, RFID, RESORT, PURCHASE_TIME, EXPIRATION_TIME, DAYS, NAME, ADDRESS_STREET, ADDRESS_CITY, ADDRESS_STATE, ADDRESS_POSTALCODE, PHONE, EMAIL, EMERGENCY_CONTACT_NAME, EMERGENCY_CONTACT_PHONE) 
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """
    session.sql(sql_query, row).collect()
    print(f"inserted ticket {record}")

if __name__ == "__main__":
    # 打开 JSON 文件并读取内容
    with gzip.open('lift_tickets_data.json.gz', 'rt', encoding='utf-8') as file:
        for message in file:
            if message.strip():  # 确保不是空行
                save_to_clickzetta(session, message)
    
    session.close()
    print("Ingest complete")

在VS Code里新建一个“终端”,并运行如下命令激活在“环境设置”步骤创建的Python环境。如果已在cz-ingest-examples环境里,请跳过。

conda activate cz-ingest-examples

然后在同一终端里运行如下命令:

python py_zettapark_sql_insert.py

下一步建议

优化:对于加载大批量数据而言,这并不是一个高效的方法。你可以通过提高任务并发度,以及在每次INSERT INTO插入多条记录来提高性能。

资料

SQL Insert Into

Zettapark快速上手

联系我们
预约咨询
微信咨询
电话咨询