基于Lakehouse Zettapark进行数据工程Data Engineering

这是一个非常基本的数据工程示例,演示了通过云器Zettapark Python代码进行读取、分组和写入数据等基本DataFrame操作。本示例使用云器Lakehouse内置的免费示例数据库(clickzetta_sample_data.tpch_100g)作为数据源。

步骤如下:

1,通过Zettapark连接到云器Lakehouse

2,通过SupplierKey连接 2个大型表 (LINEITEMS 有 6亿行 & SUPPLIER 有 100 万行)

3,通过将虚拟计算集群调整到不同规格来演示按需扩展

4,对比不同规格的虚拟计算集群对同一个任务的执行时间的不同

    汇总供应商和零件编号的数据以计算总和、最小值和最大值(3500万行)

    将结果数据框写入云器Lakehouse物理表(8000万行)

也可以通过下载Jupyter Notebook文件直接运行本实例。

整个操作从调整计算资源、读取数据、连接、汇总大约需要30来秒,向您展示了云器Lakehouse强大的功能和即时可扩展性和性能

安装云器Zettapark

# !pip install clickzetta-zettapark-python

通过ZettaPark连接到云器Lakehouse(& without PySpark)

import time
from clickzetta.zettapark.session import Session
import clickzetta.zettapark.functions as f
from clickzetta.zettapark import Session, DataFrame
from clickzetta.zettapark.functions import udf, col
from clickzetta.zettapark.types import IntegerType
from clickzetta.zettapark.functions import call_udf

<----- Make these changes before running the notebook -------------------- Change Connection params to match your environment <----------------------------------------------------------------------------

VCLUSTER_Name = 'default_ap' VCLUSTER_Size = "XSMALL" VCLUSTER_ReSize = "MEDIUM" Workspace_Name = 'gharchive' Schema_Name = 'Public'

import json
from clickzetta.zettapark.session import Session
# 1- 创建会话连接云器Lakehouse
# 从配置文件中读取参数
with open('config.json', 'r') as config_file:
config = json.load(config_file)

print("正在连接到云器Lakehouse.....n")


# 创建会话
session = Session.builder.configs(config).create()

print("连接成功!...n")

正在连接到云器Lakehouse..... 连接成功!...

sql_cmd = f"CREATE VCLUSTER IF NOT EXISTS {VCLUSTER_Name} VCLUSTER_SIZE = {VCLUSTER_Size} AUTO_SUSPEND_IN_SECOND = 10 "
print("XSMALL VCLUSTER 创建就绪 n")

session.sql(sql_cmd).collect()

session.use_schema(Schema_Name)

XSMALL VCLUSTER 创建就绪

config.json文件样本(参数说明):

{

  "username": "请替换为您的用户名",

  "password": "请替换为您的密码",

  "service": "请替换为您的服务地址",

  "instance": "请替换为您的实例ID",

  "workspace": "请替换为您的工作空间",

  "schema": "请替换为您的模式",

  "vcluster": "请替换为您的虚拟集群",

  "sdk_job_timeout": 60,

  "hints": {

    "sdk.job.timeout": 60,

    "query_tag": "test_conn_hints_zettapark"

  }

}

开始数据工程Data Engineering Process

from clickzetta.zettapark.functions import col, sum, min, max

print("Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing results to new table(80M rows) ..n")

# 2- define table
dfLineItems = session.table("clickzetta_sample_data.tpch_100g.LINEITEM") # 600 Million Rows
dfSuppliers = session.table("clickzetta_sample_data.tpch_100g.SUPPLIER") # 100K Rows

print('Lineitems Table: %s 行' % dfLineItems.count())
print('Suppliers Table: %s 行' % dfSuppliers.count())

# 3 - JOIN TABLES
dfJoinTables = dfLineItems.join(dfSuppliers, dfLineItems["L_SUPPKEY"] == dfSuppliers["S_SUPPKEY"])

# 4 - SUMMARIZE THE DATA BY SUPPLIER, PART, SUM, MIN & MAX
dfSummary = dfJoinTables.groupBy("S_NAME", "L_PARTKEY").agg(
sum(col("L_QUANTITY")).alias("TOTAL_QTY"),
min(col("L_QUANTITY")).alias("MIN_QTY"),
max(col("L_QUANTITY")).alias("MAX_QTY")
)

dfSummary.show()
Joining, Aggregating with 2 large tables(600M & 1M rows) & Writing results to new table(80M rows) ..
Lineitems Table: 600037902 行
Suppliers Table: 1000000 行
------------------------------------------------------------------
|         s_name         |   l_partkey    | total_qty | min_qty | max_qty |
------------------------------------------------------------------
| Supplier#000102785     |  18602748      |   156.00  |  12.00  |  49.00  |
| Supplier#000268783     |   8268782      |   228.00  |   1.00  |  44.00  |
| Supplier#000680518     |  12680517      |   107.00  |   8.00  |  47.00  |
| Supplier#000981141     |   1731139      |   228.00  |   5.00  |  48.00  |
| Supplier#000172390     |   1172389      |   192.00  |   1.00  |  38.00  |
| Supplier#000763964     |   1763963      |   174.00  |   1.00  |  47.00  |
| Supplier#000087125     |  16337076      |   168.00  |   4.00  |  50.00  |
| Supplier#000092530     |   1842528      |   169.00  |   1.00  |  48.00  |
| Supplier#000366762     |  18866725      |   156.00  |   3.00  |  41.00  |
| Supplier#000785842     |   4285833      |   238.00  |   2.00  |  40.00  |
------------------------------------------------------------------
# 2 - READ & JOIN 2 LARGE TABLES (600M & 1M rows)
from clickzetta.zettapark.functions import col, sum, min, max

print("正在合并和聚合两个大表(6亿行和100万行),并将结果写入新表(8000万行)..n")

dfLineItems = session.table("clickzetta_sample_data.tpch_100g.LINEITEM") # 600 Million Rows
dfSuppliers = session.table("clickzetta_sample_data.tpch_100g.SUPPLIER") # 1 Million Rows

print('Lineitems Table: %s 行' % dfLineItems.count())
print('Suppliers Table: %s 行' % dfSuppliers.count())

# 3 - JOIN TABLES
dfJoinTables = dfLineItems.join(dfSuppliers, dfLineItems["L_SUPPKEY"] == dfSuppliers["S_SUPPKEY"])

# 4 - SUMMARIZE THE DATA BY SUPPLIER, PART, SUM, MIN & MAX
dfSummary = dfJoinTables.groupBy("S_NAME", "L_PARTKEY").agg(
sum("L_QUANTITY").alias("TOTAL_QTY"),
min("L_QUANTITY").alias("MIN_QTY"),
max("L_QUANTITY").alias("MAX_QTY")
)

dfSummary.show()
正在合并和聚合两个大表(6亿行和100万行),并将结果写入新表(8000万行)..n
Lineitems Table: 600037902 行
Suppliers Table: 1000000 行
------------------------------------------------------------------
|         s_name         |  l_partkey   | total_qty | min_qty | max_qty |
------------------------------------------------------------------
| Supplier#000543332     |  14043303    |   164.00  |  17.00  |  50.00  |
| Supplier#000162101     |   6412082    |   243.00  |   3.00  |  49.00  |
| Supplier#000170221     |   9920211    |   204.00  |  10.00  |  48.00  |
| Supplier#000652699     |   4402694    |   215.00  |   3.00  |  46.00  |
| Supplier#000635296     |   1635295    |   153.00  |   3.00  |  29.00  |
| Supplier#000915082     |   3665078    |   228.00  |   3.00  |  42.00  |
| Supplier#000624767     |  15624766    |   149.00  |  11.00  |  37.00  |
| Supplier#000899746     |   4399737    |   202.00  |   1.00  |  48.00  |
| Supplier#000285255     |   6285254    |   274.00  |  10.00  |  48.00  |
| Supplier#000052105     |  19552066    |   307.00  |   1.00  |  48.00  |
------------------------------------------------------------------

3. 通过不同计算资源(虚拟集群)完成同样的计算任务需要不同的时间,查看弹性扩缩的效果.

start_time = time.time()

# 4 - 将虚拟计算集群大小调整为 XSMALL
print(f"正在调整到 {VCLUSTER_Size} ..")

sql_cmd = f"ALTER VCLUSTER {VCLUSTER_Name} SET VCLUSTER_SIZE = '{VCLUSTER_Size}' "
session.sql(sql_cmd).collect()

print("完成!...nn")


# 5 - 将结果写入新表(8000万行)
# <-- 这是当所有之前的操作编译并作为单个作业执行时
print("正在创建目标 SALES_SUMMARY 表...nn")
dfSummary.write.mode("overwrite").saveAsTable("SALES_SUMMARY")
print("目标表已创建!...")

# 6 - 查询结果(8000万行)
print("正在查询结果...n")
dfSales = session.table("SALES_SUMMARY")
dfSales.show()
end_time = time.time()

print("--- 连接、汇总和写入结果到新表用了 %s 秒 --- n" % int(end_time - start_time))
print("--- 向 SALES_SUMMARY 表写入了 %s 行" % dfSales.count())

# 7 - 将虚拟计算集群大小减少到 XSMALL
print("将 VCLUSTER 缩小到 XS...n")
sql_cmd = "ALTER VCLUSTER {} SET VCLUSTER_SIZE = 'XSMALL'".format(VCLUSTER_Name)
session.sql(sql_cmd).collect()

print("完成!...n")
正在调整到 XSMALL ..
完成!...
正在创建目标 SALES_SUMMARY 表...
目标表已创建!...
正在查询结果...
------------------------------------------------------------------
|         s_name         |   l_partkey   | total_qty | min_qty | max_qty |
------------------------------------------------------------------
| Supplier#000966043     |   1216039     |   173.00  |  9.00   |  50.00  |
| Supplier#000986803     |  17236751     |   164.00  |  5.00   |  41.00  |
| Supplier#000081344     |    81343      |   112.00  |  7.00   |  50.00  |
| Supplier#000905118     |  12405093     |   184.00  |  1.00   |  48.00  |
| Supplier#000922670     |  14172627     |   179.00  |  6.00   |  46.00  |
| Supplier#000873089     |  12373064     |   126.00  |  9.00   |  37.00  |
| Supplier#000389530     |   9889511     |   253.00  |  1.00   |  48.00  |
| Supplier#000668325     |  14668324     |   192.00  |  4.00   |  45.00  |
| Supplier#000788387     |  11538375     |   222.00  |  7.00   |  49.00  |
| Supplier#000196264     |  13946250     |   277.00  |  5.00   |  45.00  |
------------------------------------------------------------------

--- 连接、汇总和写入结果到新表用了 75 秒 ---
--- 向 SALES_SUMMARY 表写入了 79975543 行
将 VCLUSTER 缩小到 XS...
完成!...
start_time = time.time()

# 4 - 将虚拟计算集群大小增加到 MEDIUM
print(f"正在将 {VCLUSTER_Size} 调整为 {VCLUSTER_ReSize} ..")

sql_cmd = f"ALTER VCLUSTER {VCLUSTER_Name} SET VCLUSTER_SIZE = '{VCLUSTER_ReSize}'"
session.sql(sql_cmd).collect()

print("完成!...nn")


# 5 - 将结果写入新表(8000万行)
# <-- 这是当所有之前的操作编译并作为单个作业执行时
print("正在创建目标 SALES_SUMMARY 表...nn")
dfSummary.write.mode("overwrite").saveAsTable("SALES_SUMMARY")
print("目标表已创建!...")

# 6 - 查询结果(8000万行)
print("正在查询结果...n")
dfSales = session.table("SALES_SUMMARY")
dfSales.show()
end_time = time.time()
print("--- 连接、汇总和写入结果到新表用了 %s 秒 --- n" % int(end_time - start_time))
print("--- 向 SALES_SUMMARY 表写入了 %s 行" % dfSales.count())

# 7 - 将虚拟计算集群大小少到 XSMALL
print("将 VCLUSTER 缩小到 XSMALL...n")
sql_cmd = f"ALTER VCLUSTER {VCLUSTER_Name} SET VCLUSTER_SIZE = {VCLUSTER_Size}"
session.sql(sql_cmd).collect()

print("完成!...n")
正在将 XSMALL 调整为 MEDIUM ..
完成!...
正在创建目标 SALES_SUMMARY 表...
目标表已创建!...
正在查询结果...
------------------------------------------------------------------
|         s_name         | l_partkey  | total_qty | min_qty | max_qty |
------------------------------------------------------------------
| Supplier#000577084     |  3327080   |   220.00  |  15.00  |  49.00  |
| Supplier#000971635     | 12721622   |   263.00  |   4.00  |  50.00  |
| Supplier#000914390     |  5664384   |   113.00  |   5.00  |  38.00  |
| Supplier#000158186     |  2908183   |   241.00  |   7.00  |  46.00  |
| Supplier#000842304     | 13842303   |   180.00  |  12.00  |  40.00  |
| Supplier#000024822     |  9524803   |   181.00  |   1.00  |  48.00  |
| Supplier#000851711     |  7351696   |   346.00  |   3.00  |  50.00  |
| Supplier#000512255     |  6512254   |   250.00  |   3.00  |  50.00  |
| Supplier#000392018     | 18141999   |   164.00  |   1.00  |  48.00  |
| Supplier#000020477     |  9770467   |    81.00  |   4.00  |  25.00  |
------------------------------------------------------------------

--- 连接、汇总和写入结果到新表用了 18 秒 ---
--- 向 SALES_SUMMARY 表写入了 79975543 行
将 VCLUSTER 缩小到 XSMALL...
完成!...

Zettapark 相对于 Spark 和 PySpark 的优势

  • 迁移快捷 因为代码基本相同,不需要重新学习新语言
  • 更便宜 因为计算完全Severless化。它可以秒级扩展(向上/向下),并且仅在使用时运行(产生成本)。
  • 秒级即时扩缩 同样的计算任务,XSMALL规格虚拟计算集群VCluster需要75秒左右,而MEDIUM规格的只需要20秒。
  • 更快 因为消除了所有不必要的数据移动 = 使用计算的时间更少 = 成本更低
  • 更易使用 = 更少的人力 因为计算和存储几乎不需要维护。

联系我们
预约咨询
微信咨询
电话咨询