将数据导入云器Lakehouse的完整指南
数据入仓:通过Zettapark以SAVE_AS_TABLE方式加载数据
概述
使用场景
SAVE_AS_TABLE方式会自动建表,从而简化了通过Zettapark以SQL INSERT方式加载数据需要手工建表的过程,同时SAVE_AS_TABLE会自动优化INSERT INTO,每次不是插入一条记录而是多条。
实现步骤
在电脑上打开VS Code,创建一个名为 py_zettapark_save_as_table.py 的文件,并将以下代码复制到py_zettapark_save_as_table.py文件中。
import json
import gzip
from clickzetta.zettapark.session import Session
from datetime import datetime
# 从配置文件中读取参数
with open('config-ingest.json', 'r') as config_file:
config = json.load(config_file)
print("正在连接到云器Lakehouse.....\n")
# 创建会话
session = Session.builder.configs(config).create()
print("连接成功!...\n")
target_table_name = "lift_tuckets_import_by_py_save_as_table"
def save_as_table_to_clickzetta(session, schema, data):
print('Saving data to Clickzetta Lakehouse')
# Convert data to dataframe
df = session.create_dataframe(data, schema=schema)
# Save dataframe as table
df.write.save_as_table(target_table_name, mode="overwrite", table_type="transient")
print(f"Data saved to table {target_table_name}")
if __name__ == "__main__":
schema = None
data = []
# 打开压缩的 JSON 文件并读取内容
with gzip.open('lift_tickets_data.json.gz', 'rt', encoding='utf-8') as file:
for message in file:
if message.strip(): # 确保不是空行
record = json.loads(message)
if 'schema' in record:
schema = record['schema']
else:
data.append(record)
save_as_table_to_clickzetta(session, schema, data)
session.close()
print("Ingest complete")
在VS Code里新建一个“终端”,并运行如下命令激活在“环境设置”步骤创建的Python环境。如果已在cz-ingest-examples环境里,请跳过。
conda activate cz-ingest-examples
然后在同一终端里运行如下命令:
python py_zettapark_save_as_table.py
下一步建议
资料
Zettapark快速上手