Lakehouse Zettapark

Zettapark 是一个用于处理 云器Lakehouse 数据的 Python 库。它提供了一个高级的 Python API,用于在 云器Lakehouse 中执行 SQL 查询、操作数据和处理结果。Zettapark 使得在 Python 中使用 云器Lakehouse 变得更加简单和高效。你可以使用 Zettapark 执行 SQL 查询、操作数据和处理结果,就像在 Python 中使用 pandas 一样。

你也可以下载对应的Jupyter Notebook文件,方便直接运行本Demo。

在Zettapark中执行pandas操作,会被翻译成SQL在云器Lakehouse中执行,从而达到分布式计算。

比如以下 Python 代码:

df_filtered = df.filter((F.col("a") + F.col("b")) < 10)

在云器 Lakehouse 里执行的 SQL 是:

SELECT a, b FROM ( SELECT col1 AS a, col2 AS b FROM  VALUES (CAST(1 AS INT), CAST(3 AS INT)), (CAST(2 AS INT), CAST(10 AS INT))) WHERE ((a + b) < CAST(10 AS INT)) LIMIT 10;

安装clickzetta_zettapark_python

!pip install -q --upgrade clickzetta_zettapark_python -i https://pypi.tuna.tsinghua.edu.cn/simple

创建会话

使用Zettapark的第一步是与ClickZetta Lakehouse建立会话。

导入Session类。

from clickzetta.zettapark.session import Session

和ClickZetta Connector for Python一样,Zettapark的函数中使用的相同参数(例如service、instance、用户名等)与云器Lakehouse建立会话建立会话。 dict构造一个包含这些参数的名称和值的字典 (username、password、service、instance、workspace、schema等)。

要创建会话包括如下:

创建一个 Python 字典 ( dict),其中包含用于连接到 ClickZetta Lakehouse 的参数的名称和值。

将此字典传递给Session.builder.configs方法以返回具有这些连接参数的构建器对象。

create调用的方法建立builder会话。

以下示例使用dict包含连接参数来创建新会话:

hints = dict()
hints['sdk.job.timeout'] = 3
hints['query_tag'] = 'test_conn_hints_zettapark'
connection_parameters = {
  "username": "",
  "password": "",
  "service": "api.clickzetta.com",
  "instance": "",
  "workspace": "",
  "schema": "zettapark",
  "vcluster": "default",
  "sdk_job_timeout": 10,
  "hints": hints,
}
session = Session.builder.configs(connection_parameters).create()

验证会话是否创建成功

df = session.sql("show schemas;")
df.show(5)
---------------------------
|`schema_name`            |
---------------------------
|automobile               |
|automv_schema            |
|brazilianecommerce       |
|continues_computing      |
|continues_pipeline_demo  |
---------------------------

在 Zettapark Python 中使用 DataFrame

在 Zettapark 中,查询和处理数据的主要方式是通过 DataFrame。本主题介绍如何使用 DataFrame。

要检索和操作云器Lakehouse Table里的数据,请使用该类DataFrame。DataFrame 表示延迟评估的关系数据集:它仅在触发特定操作时执行。

要将数据检索到 DataFrame 中:

1,构造一个 DataFrame,指定 dataset 的数据源。

例如,您可以创建一个 DataFrame 来保存来自表、外部 CSV 文件、本地数据或 SQL 语句执行的数据。

2,指定如何转换DataFrame中的数据集。

例如,您可以指定应该选择哪些列、如何过滤行、如何对结果进行排序和分组等。

3,执行语句将数据检索到 DataFrame 中。

为了将数据检索到 DataFrame 中,您必须调用执行操作的方法(例如,方法 collect())。

本示例使用 DataFrame 来查询名为sample_product_data的表。如果要运行这些示例,您可以创建该表并通过执行以下SQL语句并向该表插入一些数据。

session.sql('CREATE  TABLE if not exists sample_product_data (id INT, parent_id INT, category_id INT, name STRING, serial_number STRING, key INT, third INT)').collect()
[Row(result_message='OPERATION SUCCEED')]
session.sql('CREATE  TABLE if not exists sample_product_data_varchar (id INT, parent_id INT, category_id INT, name STRING, serial_number VARCHAR, key INT, third INT)').collect()
[Row(result_message='OPERATION SUCCEED')]
session.sql("""
... INSERT INTO sample_product_data_varchar VALUES
... (1, 0, 5, 'Product 1', 'prod-1', 1, 10),
... (2, 1, 5, 'Product 1A', 'prod-1-A', 1, 20),
... (3, 1, 5, 'Product 1B', 'prod-1-B', 1, 30),
... (4, 0, 10, 'Product 2', 'prod-2', 2, 40),
... (5, 4, 10, 'Product 2A', 'prod-2-A', 2, 50),
... (6, 4, 10, 'Product 2B', 'prod-2-B', 2, 60),
... (7, 0, 20, 'Product 3', 'prod-3', 3, 70),
... (8, 7, 20, 'Product 3A', 'prod-3-A', 3, 80),
... (9, 7, 20, 'Product 3B', 'prod-3-B', 3, 90),
... (10, 0, 50, 'Product 4', 'prod-4', 4, 100),
... (11, 10, 50, 'Product 4A', 'prod-4-A', 4, 100),
... (12, 10, 50, 'Product 4B', 'prod-4-B', 4, 100)
... """).collect()
[Row(result_message='OPERATION SUCCEED')]

验证数据插入是否已成功,请运行:

session.sql("SELECT count(*) FROM sample_product_data_varchar").collect()
[Row(`count`(*)=120)]

构建 DataFrame

要构造 DataFrame,可以使用该类的方法和属性Session。以下每种方法都从不同类型的数据源构造DataFrame。

要从表、视图或流中的数据创建 DataFrame,请调用以下table方法:

# Create a DataFrame from the data in the "sample_product_data" table.
df_table = session.table("sample_product_data")
# To print out the first 10 rows
df_table.show()
---------------------------------------------------------------------------------------
|`id`  |`parent_id`  |`category_id`  |`name`      |`serial_number`  |`key`  |`third`  |
---------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10       |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20       |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30       |
|4     |0            |10             |Product 2   |prod-2           |2      |40       |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50       |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60       |
|7     |0            |20             |Product 3   |prod-3           |3      |70       |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80       |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90       |
|10    |0            |50             |Product 4   |prod-4           |4      |100      |
---------------------------------------------------------------------------------------
# Create a DataFrame from the data in the "sample_product_data" table.
df_table = session.table("sample_product_data_varchar")
# To print out the first 10 rows
df_table.show()
---------------------------------------------------------------------------------------
|`id`  |`parent_id`  |`category_id`  |`name`      |`serial_number`  |`key`  |`third`  |
---------------------------------------------------------------------------------------
|1     |0            |5              |Product 1   |prod-1           |1      |10       |
|2     |1            |5              |Product 1A  |prod-1-A         |1      |20       |
|3     |1            |5              |Product 1B  |prod-1-B         |1      |30       |
|4     |0            |10             |Product 2   |prod-2           |2      |40       |
|5     |4            |10             |Product 2A  |prod-2-A         |2      |50       |
|6     |4            |10             |Product 2B  |prod-2-B         |2      |60       |
|7     |0            |20             |Product 3   |prod-3           |3      |70       |
|8     |7            |20             |Product 3A  |prod-3-A         |3      |80       |
|9     |7            |20             |Product 3B  |prod-3-B         |3      |90       |
|10    |0            |50             |Product 4   |prod-4           |4      |100      |
---------------------------------------------------------------------------------------

要从指定数据创建DataFrame,请调用该create_dataframe方法:

# Create a DataFrame with one column named a from specified values.
df1 = session.create_dataframe([1, 2, 3, 4]).to_df("a")
df1.show()
-------
|`a`  |
-------
|1    |
|2    |
|3    |
|4    |
-------

创建一个具有 4 列的 DataFrame,“a”,“b”,“c” 和“d”:

df2 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"])
df2.show()
-------------------------
|`a`  |`b`  |`c`  |`d`  |
-------------------------
|1    |2    |3    |4    |
-------------------------

创建另一个包含 4 列“a”、“b”、“c”和“d”的 DataFrame:

from clickzetta.zettapark import Row
df3 = session.create_dataframe([Row(a=1, b=2, c=3, d=4)])
df3.show()
-------------------------
|`a`  |`b`  |`c`  |`d`  |
-------------------------
|1    |2    |3    |4    |
-------------------------

创建一个 DataFrame 并指定一个模式:

from clickzetta.zettapark.types import IntegerType, StringType, StructType, StructField
schema = StructType([StructField("a", IntegerType()), StructField("b", StringType())])
df4 = session.create_dataframe([[1, "click"], [3, "zetta"]], schema)
df4.show()
---------------
|`a`  |`b`    |
---------------
|1    |click  |
|3    |zetta  |
---------------

创建包含一系列值的 DataFrame,请调用以下range方法:

df_range = session.range(1, 10, 2).to_df("a")
df_range.show()
-------
|`a`  |
-------
|1    |
|3    |
|5    |
|7    |
|9    |
-------

对指定数据集进行转换

要指定选择哪些列以及如何对结果进行筛选、排序、分组等,请调用转换数据集的 DataFrame 方法。要标识这些方法中的列,请使用col计算结果为列的函数或表达式。

例如: 要指定应返回哪些行,请调用该filter方法:

from clickzetta.zettapark import functions as F
df = session.table("sample_product_data").filter(F.col("id") == 1)
df.show()
--------------------------------------------------------------------------------------
|`id`  |`parent_id`  |`category_id`  |`name`     |`serial_number`  |`key`  |`third`  |
--------------------------------------------------------------------------------------
|1     |0            |5              |Product 1  |prod-1           |1      |10       |
--------------------------------------------------------------------------------------

要指定应选择的列,请调用该select方法:

# Create a DataFrame that contains the id, name, and serial_number
# columns in the "sample_product_data" table.
df = session.table("sample_product_data").select(F.col("id"), F.col("name"), F.col("serial_number"))
df.show()
---------------------------------------
|`id`  |`name`      |`serial_number`  |
---------------------------------------
|1     |Product 1   |prod-1           |
|2     |Product 1A  |prod-1-A         |
|3     |Product 1B  |prod-1-B         |
|4     |Product 2   |prod-2           |
|5     |Product 2A  |prod-2-A         |
|6     |Product 2B  |prod-2-B         |
|7     |Product 3   |prod-3           |
|8     |Product 3A  |prod-3-A         |
|9     |Product 3B  |prod-3-B         |
|10    |Product 4   |prod-4           |
---------------------------------------
# Import the col function from the functions module.
df_product_info = session.table("sample_product_data")
df1 = df_product_info.select(df_product_info["id"], df_product_info["name"], df_product_info["serial_number"])
df2 = df_product_info.select(df_product_info.id, df_product_info.name, df_product_info.serial_number)
df3 = df_product_info.select("id", "name", "serial_number")

连接DataFrame

要连接 DataFrame 对象,请调用以下join方法:

# Create two DataFrames to join
df_lhs = session.create_dataframe([["a", 1], ["b", 2]], schema=["key", "value1"])
df_rhs = session.create_dataframe([["a", 3], ["b", 4]], schema=["key", "value2"])
# Create a DataFrame that joins the two DataFrames
# on the column named "key".
df_lhs.join(df_rhs, df_lhs.col("key") == df_rhs.col("key")).select(df_lhs["key"].as_("key"), "value1", "value2").show()
-------------------------------
|`key`  |`value1`  |`value2`  |
-------------------------------
|a      |1         |3         |
|b      |2         |4         |
-------------------------------
import copy

df = session.table("sample_product_data")
# This fails because columns named "id" and "parent_id"
# are in the left and right DataFrames in the join.
df_copy = copy.copy(df)
df_joined = df.join(df_copy, F.col("id") == F.col("parent_id"))

指定列和表达式

调用这些转换方法时,您可能需要指定列或使用列的表达式。例如,调用该select方法时,您需要指定要选择的列。

df_product_info = session.table("sample_product_data").select(F.col("id"), F.col("name"))
df_product_info.show()
---------------------
|`id`  |`name`      |
---------------------
|1     |Product 1   |
|2     |Product 1A  |
|3     |Product 1B  |
|4     |Product 2   |
|5     |Product 2A  |
|6     |Product 2B  |
|7     |Product 3   |
|8     |Product 3A  |
|9     |Product 3B  |
|10    |Product 4   |
---------------------

当指定过滤器、投影、连接条件等时,可以Column在表达式中使用对象。例如: 您可以使用Column对象和filter方法来指定过滤条件:

# Specify the equivalent of "WHERE id = 20"
# in a SQL SELECT statement.
df_filtered = df_product_info.filter(F.col("id") == 20)
df_filtered.show()
-----------------
|`id`  |`name`  |
-----------------
|      |        |
-----------------
df = session.create_dataframe([[1, 3], [2, 10]], schema=["a", "b"])
# Specify the equivalent of "WHERE a + b < 10"
# in a SQL SELECT statement.
df_filtered = df.filter((F.col("a") + F.col("b")) < 10)
df_filtered.show()
-------------
|`a`  |`b`  |
-------------
|1    |3    |
-------------

关闭会话

session.close()

资料

Zettapark Samples

联系我们
预约咨询
微信咨询
电话咨询