from sqlalchemy import text
from clickzetta_dbutils import get_active_engine
pg_engine = get_active_engine("qiliang_test_pg")
# 连接并执行查询
with pg_engine.connect() as pgconnection:
result = pgconnection.execute(text("SELECT * FROM question limit 10;"))
for row in result:
print(row)
from sqlalchemy import text
from clickzetta_dbutils import get_active_engine
pg_engine = get_active_engine("qiliang_test_pg",schema="sample",options={"search_path":"public"})
# 连接并执行查询
with pg_engine.connect() as pgconnection:
result = pgconnection.execute(text("SELECT * FROM accounts limit 10;"))
for row in result:
print(row)
from sqlalchemy import text
import pandas as pd
from clickzetta_dbutils import get_active_lakehouse_engine
engine = get_active_lakehouse_engine(vcluster="default",schema="brazilianecommerce")
# 连接并执行查询
with engine.connect() as connection:
result = connection.execute(text("SELECT * FROM olist_customers limit 10;"))
df = pd.DataFrame(result.fetchall(), columns=result.keys())
print(df.head(10))
from sqlalchemy import text
from clickzetta_dbutils import get_active_engine
# 使用默认schema
engine = get_active_engine("postgres_source_name")
with engine.connect() as conn:
results = conn.execute(text("SELECT * FROM pg_tables WHERE schemaname = 'public';"))
for row in results:
print(row)
# 指定database和options中指定schema
engine = get_active_engine("postgres_source_name",
schema="pg_database",
options={"search_path":"pg_schema"})
Python节点中使用MySQL数据源示例
from sqlalchemy import text
from clickzetta_dbutils import DatabaseConnectionManager
# 查看所有可用的数据源配置
print(DatabaseConnectionManager.load_connection_configs())
# 创建连接并指定schema
manager = DatabaseConnectionManager("mysql_source_name")
manager.use_schema("test_schema")
engine = manager.build()
with engine.connect() as conn:
result = conn.execute(text("select * from test_table limit 1;"))
Python节点中使用Lakehouse数据源示例
from sqlalchemy import text
from clickzetta_dbutils import get_active_engine
# 方式一:使用get_active_engine
engine = get_active_engine("LAKEHOUSE_source_name",
vcluster="default",
workspace="test-workspace",
schema="public")
# 方式二:使用get_active_lakehouse_engine
from clickzetta_dbutils import get_active_lakehouse_engine
engine = get_active_lakehouse_engine(vcluster="default",
workspace="test-workspace")
with engine.connect() as conn:
results = conn.execute(text("select 1"))
for row in results:
print(row)
Shell节点中使用数据源示例
在Shell节点中,可以通过创建Python脚本文件的方式使用数据源:
cat >> /tmp/db_utils_demo.py << EOF
from sqlalchemy import text
from clickzetta_dbutils import get_active_engine
engine = get_active_engine("postgres_source_name")
with engine.connect() as conn:
results = conn.execute(text("SELECT * FROM test_table;"))
for row in results:
print(row)
EOF
python /tmp/db_utils_demo.py