数据导入Lakehouse操作实践_通过Python脚本
在现代数据处理中,将数据导入数据库是一个常见的需求。使用Python脚本实现数据导入,可以带来许多优势,适用于多种场景,例如自动化、数据处理、多种数据库支持等。本文将以PostgreSQL数据库为例,演示如何将数据导入云器Lakehouse。
优势介绍
-
自动化:通过创建定期或按需运行的自动化任务,您可以实时更新或迁移数据库中的数据。
-
数据处理:Python具有强大的数据处理功能,您可以在将数据导入数据库之前对其进行预处理、清洗和转换,提高数据导入的成功率。
-
多种数据库支持:Python提供了广泛的数据库驱动程序和库,支持关系型数据库(如MySQL、PostgreSQL、SQLite等)和非关系型数据库(如MongoDB、Redis等)。
-
错误处理和日志记录:使用Python编写导入脚本时,可以捕获异常并记录详细日志,从而更容易地跟踪和解决数据导入过程中的问题。
适用场景举例
- 将CSV或Excel文件中的数据导入数据库。
- 在将数据导入数据库之前,对数据应用统计方法或规范化以进行预处理。
- 包含数据验证和完整性检查的数据库迁移任务。
操作指南
1. 导入需要的Python包
from sqlalchemy import create_engine
import pandas as pd
import psycopg2
import pygwalker as pyg
2. 创建一个连接到PostgreSQL数据库并读取源表数据
def create_conn_pg(database: str, user: str, password: str, host: str, port: str):
connection = psycopg2.connect(
dbname=database,
user=user,
password=password,
host=host,
port=port
)
return connection
3. 使用pd.read_sql_query()方法从数据库中查询数据并将其存储到DataFrame:
conn_pg = create_conn_pg("<database>", "<username>", "<password>", "<host>", "<port>")
query = "SELECT * FROM public.orders;"
df = pd.read_sql_query(query, conn_pg)
conn_pg.close()
4. 查看Postgres数据并对数据进行清洗和可视化分析
# 查看数据总行数
print(df.shape[0])
# 替换所有nan值为0
df.fillna(0, inplace=True)
# 显示前5行数据
print(df.head(5))
5. 使用pygwalker进行数据透视分析(可选)
6. 在ClickZetta Lakehouse创建目标表
engine_cz = create_engine("clickzetta://<username>:<password>@<instanceid>.api.clickzetta.com/<workspacename>?virtualcluster=<vcluster>&schema=<public>")
sql_cz = text("""
CREATE TABLE IF NOT EXISTS orders_tmp (
id INT,
user_id INT,
product_id INT,
subtotal DECIMAL(8, 2),
tax DECIMAL(8, 2),
total DECIMAL(16, 3),
discount DECIMAL(8, 2),
created_at TIMESTAMP,
quantity INT
);
""")
with engine_cz.connect() as conn:
results = conn.execute(sql_cz)
7. 将转换后的源表数据写入到云器Lakehouse目标表
conn_cz = connect(
username="<username>",
password="<password>",
service="api.clickzetta.com",
instance="<instanceid>",
workspace="<workspacename>",
schema="public",
vcluster="default"
)
bulkload_stream = conn_cz.create_bulkload_stream(schema="public", table="orders")
writer = bulkload_stream.open_writer(0)
for index, row_data in df.iterrows():
row = writer.create_row()
row.set_value("id", row_data["id"])
row.set_value("created_at", row_data["created_at"])
row.set_value("user_id", row_data["user_id"])
row.set_value("product_id", row_data["product_id"])
row.set_value("subtotal", row_data["subtotal"])
row.set_value("tax", row_data["tax"])
row.set_value("total", row_data["total"])
row.set_value("discount", row_data["discount"])
row.set_value("quantity", row_data["quantity"])
writer.write(row)
writer.close()
bulkload_stream.commit()
8. 清理
#删除云器 Lakehouse目标表
sql_cz = text("""drop table if exists orders ;""")
with engine_cz.connect() as conn:
results = conn.execute(sql_cz)