数据导入Lakehouse操作实践_通过Python脚本

在现代数据处理中,将数据导入数据库是一个常见的需求。使用Python脚本实现数据导入,可以带来许多优势,适用于多种场景,例如自动化、数据处理、多种数据库支持等。本文将以PostgreSQL数据库为例,演示如何将数据导入云器Lakehouse。

优势介绍

  1. 自动化:通过创建定期或按需运行的自动化任务,您可以实时更新或迁移数据库中的数据。

  2. 数据处理:Python具有强大的数据处理功能,您可以在将数据导入数据库之前对其进行预处理、清洗和转换,提高数据导入的成功率。

  3. 多种数据库支持:Python提供了广泛的数据库驱动程序和库,支持关系型数据库(如MySQL、PostgreSQL、SQLite等)和非关系型数据库(如MongoDB、Redis等)。

  4. 错误处理和日志记录:使用Python编写导入脚本时,可以捕获异常并记录详细日志,从而更容易地跟踪和解决数据导入过程中的问题。

适用场景举例

  1. 将CSV或Excel文件中的数据导入数据库。
  2. 在将数据导入数据库之前,对数据应用统计方法或规范化以进行预处理。
  3. 包含数据验证和完整性检查的数据库迁移任务。

操作指南

1. 导入需要的Python包

from sqlalchemy import create_engine
import pandas as pd
import psycopg2
import pygwalker as pyg

2. 创建一个连接到PostgreSQL数据库并读取源表数据

def create_conn_pg(database: str, user: str, password: str, host: str, port: str):
    connection = psycopg2.connect(
        dbname=database,
        user=user,
        password=password,
        host=host,
        port=port
    )
    return connection

3. 使用pd.read_sql_query()方法从数据库中查询数据并将其存储到DataFrame:

conn_pg = create_conn_pg("<database>", "<username>", "<password>", "<host>", "<port>")
query = "SELECT * FROM public.orders;"
df = pd.read_sql_query(query, conn_pg)
conn_pg.close()

4. 查看Postgres数据并对数据进行清洗和可视化分析

# 查看数据总行数
print(df.shape[0])

# 替换所有nan值为0
df.fillna(0, inplace=True)

# 显示前5行数据
print(df.head(5))

5. 使用pygwalker进行数据透视分析(可选)

walker = pyg.walk(df)

6. 在ClickZetta Lakehouse创建目标表

engine_cz = create_engine("clickzetta://<username>:<password>@<instanceid>.api.clickzetta.com/<workspacename>?virtualcluster=<vcluster>&schema=<public>")
sql_cz = text("""
CREATE TABLE IF NOT EXISTS orders_tmp (
    id INT,
    user_id INT,
    product_id INT,
    subtotal DECIMAL(8, 2),
    tax DECIMAL(8, 2),
    total DECIMAL(16, 3),
    discount DECIMAL(8, 2),
    created_at TIMESTAMP,
    quantity INT
);
""")
with engine_cz.connect() as conn:
    results = conn.execute(sql_cz)

7. 将转换后的源表数据写入到云器Lakehouse目标表

conn_cz = connect(
    username="<username>",
    password="<password>",
    service="api.clickzetta.com",
    instance="<instanceid>",
    workspace="<workspacename>",
    schema="public",
    vcluster="default"
)
bulkload_stream = conn_cz.create_bulkload_stream(schema="public", table="orders")
writer = bulkload_stream.open_writer(0)
for index, row_data in df.iterrows():
    row = writer.create_row()
    row.set_value("id", row_data["id"])
    row.set_value("created_at", row_data["created_at"])
    row.set_value("user_id", row_data["user_id"])
    row.set_value("product_id", row_data["product_id"])
    row.set_value("subtotal", row_data["subtotal"])
    row.set_value("tax", row_data["tax"])
    row.set_value("total", row_data["total"])
    row.set_value("discount", row_data["discount"])
    row.set_value("quantity", row_data["quantity"])
    writer.write(row)
writer.close()
bulkload_stream.commit()

8. 清理

#删除云器 Lakehouse目标表
sql_cz = text("""drop table if exists orders ;""")
with engine_cz.connect() as conn:
    results = conn.execute(sql_cz)

联系我们
预约咨询
微信咨询
电话咨询