Lakehouse Zettapark
Zettapark 是一个用于处理 云器Lakehouse 数据的 Python 库。它提供了一个高级的 Python API,用于在 云器Lakehouse 中执行 SQL 查询、操作数据和处理结果。Zettapark 使得在 Python 中使用 云器Lakehouse 变得更加简单和高效。你可以使用 Zettapark 执行 SQL 查询、操作数据和处理结果,就像在 Python 中使用 pandas 一样。
你也可以下载对应的Jupyter Notebook文件,方便直接运行本Demo。
在Zettapark中执行pandas操作,会被翻译成SQL在云器Lakehouse中执行,从而达到分布式计算。
比如以下 Python 代码:
df_filtered = df.filter((F.col("a") + F.col("b")) < 10)
在云器 Lakehouse 里执行的 SQL 是:
SELECT a, b FROM ( SELECT col1 AS a, col2 AS b FROM VALUES (CAST(1 AS INT), CAST(3 AS INT)), (CAST(2 AS INT), CAST(10 AS INT))) WHERE ((a + b) < CAST(10 AS INT)) LIMIT 10;
安装clickzetta_zettapark_python
!pip install -q --upgrade clickzetta_zettapark_python -i https://pypi.tuna.tsinghua.edu.cn/simple
创建会话
使用Zettapark的第一步是与ClickZetta Lakehouse建立会话。
导入Session类。
from clickzetta.zettapark.session import Session
和ClickZetta Connector for Python一样,Zettapark的函数中使用的相同参数(例如service、instance、用户名等)与云器Lakehouse建立会话建立会话。 dict构造一个包含这些参数的名称和值的字典 (username、password、service、instance、workspace、schema等)。
要创建会话包括如下:
创建一个 Python 字典 ( dict),其中包含用于连接到 ClickZetta Lakehouse 的参数的名称和值。 将此字典传递给Session.builder.configs方法以返回具有这些连接参数的构建器对象。 create调用的方法建立builder会话。
以下示例使用dict包含连接参数来创建新会话:
hints = dict() hints['sdk.job.timeout'] = 3 hints['query_tag'] = 'test_conn_hints_zettapark' connection_parameters = { "username": "", "password": "", "service": "api.clickzetta.com", "instance": "", "workspace": "", "schema": "zettapark", "vcluster": "default", "sdk_job_timeout": 10, "hints": hints, } session = Session.builder.configs(connection_parameters).create()
验证会话是否创建成功
df = session.sql("show schemas;") df.show(5)
--------------------------- |`schema_name` | --------------------------- |automobile | |automv_schema | |brazilianecommerce | |continues_computing | |continues_pipeline_demo | ---------------------------
在 Zettapark Python 中使用 DataFrame
在 Zettapark 中,查询和处理数据的主要方式是通过 DataFrame。本主题介绍如何使用 DataFrame。
要检索和操作云器Lakehouse Table里的数据,请使用该类DataFrame。DataFrame 表示延迟评估的关系数据集:它仅在触发特定操作时执行。
要将数据检索到 DataFrame 中:
1,构造一个 DataFrame,指定 dataset 的数据源。 例如,您可以创建一个 DataFrame 来保存来自表、外部 CSV 文件、本地数据或 SQL 语句执行的数据。 2,指定如何转换DataFrame中的数据集。 例如,您可以指定应该选择哪些列、如何过滤行、如何对结果进行排序和分组等。 3,执行语句将数据检索到 DataFrame 中。 为了将数据检索到 DataFrame 中,您必须调用执行操作的方法(例如,方法 collect())。
本示例使用 DataFrame 来查询名为sample_product_data的表。如果要运行这些示例,您可以创建该表并通过执行以下SQL语句并向该表插入一些数据。
session.sql('CREATE TABLE if not exists sample_product_data (id INT, parent_id INT, category_id INT, name STRING, serial_number STRING, key INT, third INT)').collect()
[Row(result_message='OPERATION SUCCEED')]
session.sql('CREATE TABLE if not exists sample_product_data_varchar (id INT, parent_id INT, category_id INT, name STRING, serial_number VARCHAR, key INT, third INT)').collect()
[Row(result_message='OPERATION SUCCEED')]
session.sql(""" ... INSERT INTO sample_product_data_varchar VALUES ... (1, 0, 5, 'Product 1', 'prod-1', 1, 10), ... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20), ... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30), ... (4, 0, 10, 'Product 2', 'prod-2', 2, 40), ... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50), ... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60), ... (7, 0, 20, 'Product 3', 'prod-3', 3, 70), ... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80), ... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90), ... (10, 0, 50, 'Product 4', 'prod-4', 4, 100), ... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100), ... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100) ... """).collect()
[Row(result_message='OPERATION SUCCEED')]
验证数据插入是否已成功,请运行:
session.sql("SELECT count(*) FROM sample_product_data_varchar").collect()
[Row(`count`(*)=120)]
构建 DataFrame
要构造 DataFrame,可以使用该类的方法和属性Session。以下每种方法都从不同类型的数据源构造DataFrame。
要从表、视图或流中的数据创建 DataFrame,请调用以下table方法:
# Create a DataFrame from the data in the "sample_product_data" table. df_table = session.table("sample_product_data") # To print out the first 10 rows df_table.show()
--------------------------------------------------------------------------------------- |`id` |`parent_id` |`category_id` |`name` |`serial_number` |`key` |`third` | --------------------------------------------------------------------------------------- |1 |0 |5 |Product 1 |prod-1 |1 |10 | |2 |1 |5 |Product 1A |prod-1-A |1 |20 | |3 |1 |5 |Product 1B |prod-1-B |1 |30 | |4 |0 |10 |Product 2 |prod-2 |2 |40 | |5 |4 |10 |Product 2A |prod-2-A |2 |50 | |6 |4 |10 |Product 2B |prod-2-B |2 |60 | |7 |0 |20 |Product 3 |prod-3 |3 |70 | |8 |7 |20 |Product 3A |prod-3-A |3 |80 | |9 |7 |20 |Product 3B |prod-3-B |3 |90 | |10 |0 |50 |Product 4 |prod-4 |4 |100 | ---------------------------------------------------------------------------------------
# Create a DataFrame from the data in the "sample_product_data" table. df_table = session.table("sample_product_data_varchar") # To print out the first 10 rows df_table.show()
--------------------------------------------------------------------------------------- |`id` |`parent_id` |`category_id` |`name` |`serial_number` |`key` |`third` | --------------------------------------------------------------------------------------- |1 |0 |5 |Product 1 |prod-1 |1 |10 | |2 |1 |5 |Product 1A |prod-1-A |1 |20 | |3 |1 |5 |Product 1B |prod-1-B |1 |30 | |4 |0 |10 |Product 2 |prod-2 |2 |40 | |5 |4 |10 |Product 2A |prod-2-A |2 |50 | |6 |4 |10 |Product 2B |prod-2-B |2 |60 | |7 |0 |20 |Product 3 |prod-3 |3 |70 | |8 |7 |20 |Product 3A |prod-3-A |3 |80 | |9 |7 |20 |Product 3B |prod-3-B |3 |90 | |10 |0 |50 |Product 4 |prod-4 |4 |100 | ---------------------------------------------------------------------------------------
要从指定数据创建DataFrame,请调用该create_dataframe方法:
# Create a DataFrame with one column named a from specified values. df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a") df1.show()
------- |`a` | ------- |1 | |2 | |3 | |4 | -------
创建一个具有 4 列的 DataFrame,“a”,“b”,“c” 和“d”:
df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"]) df2.show()
------------------------- |`a` |`b` |`c` |`d` | ------------------------- |1 |2 |3 |4 | -------------------------
创建另一个包含 4 列“a”、“b”、“c”和“d”的 DataFrame:
from clickzetta.zettapark import Row df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)]) df3.show()
------------------------- |`a` |`b` |`c` |`d` | ------------------------- |1 |2 |3 |4 | -------------------------
创建一个 DataFrame 并指定一个模式:
from clickzetta.zettapark.types import IntegerType, StringType, StructType, StructField schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())]) df4 = session.create_dataframe([[1, "click"], [3, "zetta"]], schema) df4.show()
--------------- |`a` |`b` | --------------- |1 |click | |3 |zetta | ---------------
创建包含一系列值的 DataFrame,请调用以下range方法:
df_range = session.range(1, 10, 2).to_df("a") df_range.show()
------- |`a` | ------- |1 | |3 | |5 | |7 | |9 | -------
对指定数据集进行转换
要指定选择哪些列以及如何对结果进行筛选、排序、分组等,请调用转换数据集的 DataFrame 方法。要标识这些方法中的列,请使用col计算结果为列的函数或表达式。
例如: 要指定应返回哪些行,请调用该filter方法:
from clickzetta.zettapark import functions as F
df = session.table("sample_product_data").filter(F.col("id") == 1) df.show()
-------------------------------------------------------------------------------------- |`id` |`parent_id` |`category_id` |`name` |`serial_number` |`key` |`third` | -------------------------------------------------------------------------------------- |1 |0 |5 |Product 1 |prod-1 |1 |10 | --------------------------------------------------------------------------------------
要指定应选择的列,请调用该select方法:
# Create a DataFrame that contains the id, name, and serial_number # columns in the "sample_product_data" table. df = session.table("sample_product_data").select(F.col("id"), F.col("name"), F.col("serial_number")) df.show()
--------------------------------------- |`id` |`name` |`serial_number` | --------------------------------------- |1 |Product 1 |prod-1 | |2 |Product 1A |prod-1-A | |3 |Product 1B |prod-1-B | |4 |Product 2 |prod-2 | |5 |Product 2A |prod-2-A | |6 |Product 2B |prod-2-B | |7 |Product 3 |prod-3 | |8 |Product 3A |prod-3-A | |9 |Product 3B |prod-3-B | |10 |Product 4 |prod-4 | ---------------------------------------
# Import the col function from the functions module. df_product_info = session.table("sample_product_data") df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"]) df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number) df3 = df_product_info.select("id", "name", "serial_number")
连接DataFrame
要连接 DataFrame 对象,请调用以下join方法:
# Create two DataFrames to join df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"]) df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"]) # Create a DataFrame that joins the two DataFrames # on the column named "key". df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
------------------------------- |`key` |`value1` |`value2` | ------------------------------- |a |1 |3 | |b |2 |4 | -------------------------------
import copy df = session.table("sample_product_data") # This fails because columns named "id" and "parent_id" # are in the left and right DataFrames in the join. df_copy = copy.copy(df) df_joined = df.join(df_copy, F.col("id") == F.col("parent_id"))
指定列和表达式
调用这些转换方法时,您可能需要指定列或使用列的表达式。例如,调用该select方法时,您需要指定要选择的列。
df_product_info = session.table("sample_product_data").select(F.col("id"), F.col("name")) df_product_info.show()
--------------------- |`id` |`name` | --------------------- |1 |Product 1 | |2 |Product 1A | |3 |Product 1B | |4 |Product 2 | |5 |Product 2A | |6 |Product 2B | |7 |Product 3 | |8 |Product 3A | |9 |Product 3B | |10 |Product 4 | ---------------------
当指定过滤器、投影、连接条件等时,可以Column在表达式中使用对象。例如: 您可以使用Column对象和filter方法来指定过滤条件:
# Specify the equivalent of "WHERE id = 20" # in a SQL SELECT statement. df_filtered = df_product_info.filter(F.col("id") == 20) df_filtered.show()
----------------- |`id` |`name` | ----------------- | | | -----------------
df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"]) # Specify the equivalent of "WHERE a + b < 10" # in a SQL SELECT statement. df_filtered = df.filter((F.col("a") + F.col("b")) < 10) df_filtered.show()
------------- |`a` |`b` | ------------- |1 |3 | -------------
关闭会话
session.close()