Python任务使用实践
在任务开发模块中提供的Python任务,作为一种轻量级资源容器下的特定任务类型,专为运行Python代码而设计,提供了任务间环境隔离的功能,并具备基础的环境定制能力。本文介绍Python任务的一些使用实践。
运行环境和定制
Python任务在系统预设的Pod环境中执行,预装的Python版本为Python 3(当前版本为3.9.2,未来可能会更新升级)。
系统默认镜像中包含了一些常用的依赖包,以支持与云器Lakehouse的连接和数据访问,以及对阿里云OSS和腾讯云COS等对象存储服务的操作。这些依赖包括但不限于:
- clickzetta-connector
- clickzetta-sqlalchemy
- cos-python-sdk-v5
- numpy
- oss2
- pandas
- six
- urllib2
- ...
为了满足特定的运行需求,Pod环境提供了有限的环境定制能力。您可以在/home/system_normal
路径下进行自定义安装。以下是一个示例代码片段,展示了如何在Python环境中安装自定义安装包(第4行和第5行)并使用。请注意,Python任务执行完毕后,Pod环境将会被销毁,因此任何环境定制都不会被保留。
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "mysql-connector-python","--target", "/home/system_normal", "-i", "https://pypi.tuna.tsinghua.edu.cn/simple"])
sys.path.append('/home/system_normal')
import mysql.connector
# 创建连接
connection = mysql.connector.connect(
host='127.0.0.1', # 数据库主机地址
user='****', # 数据库用户名
password='***********', # 数据库密码
database='demo' # 需要连接的数据库名称
)
# 创建游标
cursor = connection.cursor()
# 执行查询
query = "show tables" # 替换为你的 SQL 查询
cursor.execute(query)
# 获取查询结果
results = cursor.fetchall()
print("Query results:")
for row in results:
print(row)
# 关闭游标和连接
cursor.close()
connection.close()
运行资源大小调整
默认情况下,Pod提供0.5个CPU核和512MB的内存资源。如果需要,您可以在任务的调度配置中通过以下参数调整资源分配:
通过合理配置这些参数,您可以确保Python任务拥有足够的资源来满足不同的计算需求,同时避免资源浪费。
更多使用案例
使用 Python Database API 查询Lakehouse数据
from clickzetta import connect
# 建立连接
conn = connect(
username='your_username',
password='your_password',
service='api.clickzetta.com',
instance='your_instance',
workspace='your_workspace',
schema='public',
vcluster='default'
)
# 创建游标对象
cursor = conn.cursor()
# 执行 SQL 查询
cursor.execute('SELECT * FROM clickzetta_sample_data.ecommerce_events_history.ecommerce_events_multicategorystore_live LIMIT 10;')
# 获取查询结果
results = cursor.fetchall()
for row in results:
print(row)
使用 SQLAlchemy 接口查询Lakehouse数据
from sqlalchemy import create_engine
from sqlalchemy import text
# 创建 ClickZetta Lakehouse 的 SQLAlchemy 引擎实例
engine = create_engine(
"clickzetta://username:password@instance.api.clickzetta.com/workspace?schema=schema&vcluster=default"
)
# 执行 SQL 查询
sql = text("SELECT * FROM ecommerce_events_multicategorystore_live;")
# 使用引擎执行查询
with engine.connect() as conn:
result = conn.execute(sql)
for row in result:
print(row)
使用 Python 批量上传数据到Lakehouse
from clickzetta import connect
conn = connect(
username='your_username',
password='your_password',
service='api.clickzetta.com',
instance='your_instance',
workspace='your_workspace',
schema='public',
vcluster='default'
)
bulkload_stream = conn.create_bulkload_stream(schema='public', table='bulkload_test')
writer = bulkload_stream.open_writer(0)
for index in range(1000000):
row = writer.create_row()
row.set_value('i', index)
row.set_value('s', 'Hello')
row.set_value('d', 123.456)
writer.write(row)
writer.close()
bulkload_stream.commit()