(SaaS)² :云器Lakehouse+Zilliz,Make Data Ready for BI and AI
方案简介
-
(SaaS)² :云器Lakehouse和Zilliz都提供了基于主流云服务的SaaS模式的服务,通过SaaS服务的组合,最大程度从全托管和按量付费的SaaS模式获益。
-
Make Data Ready for BI and AI:云器Lakehouse的数据仓库专注于面向BI应用提供标量数据的存储、处理和分析,Zilliz向量数据库专注于面向AI的增强数据分析。通过云器Lakehouse和Zilliz向量数据库的集成,提供完整的生产级别的BI+AI解决方案,解决BI、AI不对称的问题:
- BI数据和AI数据新鲜度不对称:Zilliz Vector Data Pipeline提供批量进行数据Embedding的服务,和非批量相比Embedding时间缩短10倍+,大幅提升AI数据的新鲜度。
- BI数据和AI数据规模不对称:Zilliz在百亿级向量数据级别时仍能提供稳定的快速响应和并发性,向量数据不再是一个补充性的“小众”数据,实现BI数据和AI数据在规模上的并驾齐驱。
-
业务升级:以最简单的方案将传统的数据分析升级到增强分析,实现BI+AI的融合。
方案组成

- 云器Lakehouse平台:提供数据湖和数据仓库的管理,包括从数据管理、数据集成、任务开发、任务执行、工作流workflow编排、任务监控运维等。
- 云器Zettapark:通过Python+Dataframe的编程方式,实现csv文件的加载。
- Zilliz向量数据库:性能出色、性价比高的向量数据库。
- Zelliz Data Pipeline:对文本、图片、文件等数据进行向量化存储和检索,支持中英文Embedding模型、Rerank模型,提供极致简化和开发者友好的的向量处理方式。
应用场景示例:通过语义检索增强文本搜索
- 标量检索:云器Lakehouse提供基于文本的Like模糊匹配、基于文本倒排索引的关键字搜索。
- 向量检索:Zilliz提供基于向量数据的语义检索和Rerank模型的结果精排。
将标量检索和向量检索结合,提高检索的性能、准确性,适用于产品搜索、产品推荐等场景。
任务一:加载原始数据到云器Lakehouse
云器Lakehouse提供多种加载csv数据的方式,包括WEB方式的离线数据同步、通过数据湖加载csv等。本文采用云器Zettapark方式,实现数据加载,python代码运行在云器Lakehouse的Python任务节点里。

代码如下:
# ********************************************************************#
# author: qiliang
# create time: 2024-09-14 10:10:26
# ********************************************************************#
from clickzetta.zettapark.session import Session
hints = dict()
hints['sdk.job.timeout'] = 3
hints['query_tag'] = 'test_conn_hints_zettapark'
connection_parameters = {
"username": "qiliang",
"password": "",
"service": "uat-api.clickzetta.com",
"instance": "",
"workspace": "ql_ws",
"schema": "wayfair_wans",
"vcluster": "default",
"sdk_job_timeout": 10,
"hints": hints,
}
session = Session.builder.configs(connection_parameters).create()
import os
import pandas as pd
import warnings
# 忽略 FutureWarning
warnings.filterwarnings("ignore", category=FutureWarning)
# 文件 URL 数组
urls = [
'https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/label.csv',
'https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/product.csv',
'https://raw.githubusercontent.com/wayfair/WANDS/main/dataset/query.csv'
]
for url in urls:
# 提取文件名并去掉扩展名
table_name = os.path.basename(url).split('.')[0]
data = pd.read_csv(url, delimiter='\t') # Specify the delimiter as tab
# Replace spaces in column names with underscores and convert to lowercase
data.columns = [col.replace(" ", "_").lower() for col in data.columns]
# Check if the table exists
try:
session.table(table_name)
table_exists = True
except Exception:
table_exists = False
# Create the table if it doesn't exist
if not table_exists:
column_definitions = ", ".join([f"{col} STRING" for col in data.columns])
session.sql(f"CREATE TABLE {table_name} ({column_definitions})")
df = session.create_dataframe(data)
df.write.save_as_table(table_name, mode="overwrite", table_type="transient")
print(f"Data from {filepath} written to table {table_name}")
# Drop tables starting with 'zettapark_temp_table_'
try:
tables = session.sql("SHOW TABLES LIKE 'zettapark_temp_table_%'").collect()
for table in tables:
table_name = table['table_name']
session.sql(f"DROP TABLE IF EXISTS {table_name}").collect()
print(f"temp Table {table_name} dropped")
except Exception as e:
print(f"Error dropping temp tables: {e}")
# Close the session
session.close()
然后在云器Lakehouse控制台检查结果:


任务二:开发SQL任务,为BI准备数据

代码如下:
--LAKEHOUSE SQL
--********************************************************************--
-- author: qiliang
-- create time: 2024-09-12 15:17:34
--********************************************************************--
-- DROP TABLE if exists product_cleaned;
CREATE TABLE if not exists product_cleaned(
`product_id` bigint,
`product_name` string,
`product_class` string,
`category_hierarchy` string,
`product_description` string,
`product_features` string,
`rating_count` double,
`average_rating` double,
`review_count` double,
`product_text` string,
`product_full_text` string,
`product_features_json` json,
index inverted_index_product_text (product_text) using inverted,
index inverted_index_product_full_text (product_full_text) using inverted,
);
INSERT OVERWRITE product_cleaned
SELECT
*,
COALESCE(product_description, product_name) as product_text,
CONCAT(
COALESCE(product_name, ''), ';',
COALESCE(product_description, ''), ';',
COALESCE(product_class, ''), ';',
COALESCE(category_hierarchy, ''), ';',
COALESCE(product_features, '')
) as product_full_text,
JSON_PARSE(
to_json(
map_from_entries(
transform(
split(product_features, '\\|'),
entry -> struct(
trim(split(entry, ':')[0]),
trim(split(entry, ':')[1])
)
)
)
) AS product_features_json,
FROM product);
任务三:创建Zillize Data Ingestion Pipeline
Zilliz Cloud Pipelines 能够简化将非结构化数据转换为 Embedding 向量的流程,并对接 Zilliz Cloud 向量数据库存储向量数据,实现高效的向量索引和检索。开发人员在处理非结构化数据时,时常面临复杂的非结构化数据转换和检索问题,这会降低开发速度。Zilliz Cloud Pipelines 通过提供一体化解决方案来应对这一挑战,帮助开发人员轻松将非结构化数据转换为可搜索的向量,并对接 Zilliz Cloud 向量数据库确保高质量的向量检索。

获取新创建的Pipiline的客户端代码,作为下一步的代码输入:

任务四:开发Python任务在工作流里调用Zilliz Data Ingestion Pipeline API,为AI增强分析准备数据,实现向量数据的ETL自动化。
将云器Lakehouse的表名 为product的的文本信息送到Zilliz,对文本数据先进行Embedding,然后进行向量化存储

在云器Lakehouse里执行完上述代码后,到Zilliz控制台查看,验证向量化结果:


任务五:通过云器Lakehouse工作流编排定义完整的数据流程
分别给上述任务设置调度属性并提交,构建数据工作流:

任务六:创建Zilliz Data Search Pipeline
使用 Zillize Data Search Pipeline 能够快速高效将查询文本转换为 Embedding 向量,返回最相关的 top-K 个文档块(包括文本和元数据),有效地从搜索结果中获取数据洞见。

任务七:通过Zilliz API进行数据分析
import http.client
import json
conn = http.client.HTTPSConnection("controller.api.gcp-us-west1.zillizcloud.com")
headers = {
'Authorization': "Bearer ******",
'Accept': "application/json",
'Content-Type': "application/json"
}
search_without_rerank_payload = "{\"data\":{\"query_text\":\"black 5 drawer dresser by guilford\"},\"params\":{\"limit\":20,\"offset\":0,\"outputFields\":[],\"filter\":\"id >= 0\"}}"
conn.request("POST", "/v1/pipelines/pipe-e46ae76b70773f85543c93/run", search_without_rerank_payload, headers)
res = conn.getresponse()
data = res.read()
# Decode the response data
decoded_data = data.decode("utf-8")
# Parse the JSON data
parsed_data = json.loads(decoded_data)
# Pretty-print the JSON data
pretty_json = json.dumps(parsed_data, ensure_ascii=False, indent=4)
print(pretty_json)