将数据导入云器Lakehouse的完整指南

数据入仓:通过Zettapark以SAVE_AS_TABLE方式加载数据

概述

使用场景

SAVE_AS_TABLE方式会自动建表,从而简化了通过Zettapark以SQL INSERT方式加载数据需要手工建表的过程,同时SAVE_AS_TABLE会自动优化INSERT INTO,每次不是插入一条记录而是多条。

实现步骤

在电脑上打开VS Code,创建一个名为 py_zettapark_save_as_table.py 的文件,并将以下代码复制到py_zettapark_save_as_table.py文件中。

import json
import gzip
from clickzetta.zettapark.session import Session
from datetime import datetime

# 从配置文件中读取参数
with open('config-ingest.json', 'r') as config_file:
    config = json.load(config_file)

print("正在连接到云器Lakehouse.....\n")

# 创建会话
session = Session.builder.configs(config).create()

print("连接成功!...\n")

target_table_name = "lift_tuckets_import_by_py_save_as_table"

def save_as_table_to_clickzetta(session, schema, data):
    print('Saving data to Clickzetta Lakehouse')

    # Convert data to dataframe
    df = session.create_dataframe(data, schema=schema)
    
    # Save dataframe as table
    df.write.save_as_table(target_table_name, mode="overwrite", table_type="transient")
    print(f"Data saved to table {target_table_name}")

if __name__ == "__main__":
    schema = None
    data = []

    # 打开压缩的 JSON 文件并读取内容
    with gzip.open('lift_tickets_data.json.gz', 'rt', encoding='utf-8') as file:
        for message in file:
            if message.strip():  # 确保不是空行
                record = json.loads(message)
                if 'schema' in record:
                    schema = record['schema']
                else:
                    data.append(record)
    
    save_as_table_to_clickzetta(session, schema, data)
    session.close()
    print("Ingest complete")

在VS Code里新建一个“终端”,并运行如下命令激活在“环境设置”步骤创建的Python环境。如果已在cz-ingest-examples环境里,请跳过。

conda activate cz-ingest-examples

然后在同一终端里运行如下命令:

python py_zettapark_save_as_table.py

下一步建议

资料

Zettapark快速上手

联系我们
预约咨询
微信咨询
电话咨询