作者:云器科技产品经理 唐晨
在大数据及AI时代,高效准确的信息检索已成为我们日常生活和工作中不可或缺的一部分。无论是在搜索引擎中寻找答案,还是在AI大模型通过对话获得信息,或是再企业数据库中查找关键信息,我们都在不断地与信息检索打交道。
然而,随着大数据和GenAI生成式AI的飞速发展,许多拥有海量数据资源的企业在思考,如何将强大的生成式AI与企业自有数据结合,依托生成式AI的技术来提取企业个性化的数据资源,是企业数据资产价值提升的新方式。
在这一背景下,本文将从传统的技术栈出发,到全新融合技术的探讨和案例实践收尾。通过这个梳理,您将了解传统的向量召回和模糊检索技术,分别了解它们如何通过各自独特的方式提升了检索效率和准确性。进而,本文会谈到当这两种技术与生成式AI紧密集成时,一种全新的信息检索范式如何形成融合,以及如何带来实践效果,为企业数据应用带来新的可能,为用户体验带来革命性的提升。
向量召回技术:精准匹配的利器
向量召回技术是信息检索领域的一项创新,它通过将文本转换为高维向量,并利用余弦相似度等方法快速进行语义相似性匹配,从而实现高效的文本检索。
向量召回技术基本原理如下:
首先,向量召回技术的核心在于将文本数据转换为数值型向量。这一过程通常包括文本预处理、词嵌入生成、向量空间模型构建等步骤。其中,预处理步骤涉及分词、去除停用词等操作,以净化文本数据。然后,通过神经网络模型,如 Word2Vec 或 BERT,将文本映射到高维空间,形成词向量或句子向量。最后,通过计算向量之间的距离(如余弦相似度),评估文本间的相似性。
向量召回技术的优点主要包括:
- 高精度:能够快速捕捉文本间的语义相似性,提供高精度的匹配结果。
- 快速性:基于向量空间的计算可以迅速完成匹配,尤其适用于大规模数据集,保证了即使在庞大的数据集中也能快速检索,极大地提升了检索效率。
然而,向量召回技术也存在一些缺点:
- 对噪声敏感:在处理含噪声数据时,向量召回技术的效果可能会受到影响。
- 依赖语义理解:技术实现对文本的深度语义理解要求较高,需要复杂的模型和训练过程,这不仅增加了技术实现的难度,也提高了对计算资源的需求。
模糊检索技术:容错性的代名词
与向量召回技术刚好相反,模糊检索技术具有很高的容错性,专注于处理不完整或带有噪声的数据,通过关键词匹配来实现检索,为检索系统提供了强大的灵活性。模糊检索技术基于关键词匹配,使用正则表达式、编辑距离等算法,对用户的查询进行宽松的匹配,允许一定程度的偏差存在。这种宽容的态度,使得即使在面对拼写错误或不完整输入时,也能提供相关且有用的检索结果。
模糊检索技术的优点主要包括:
- 高容错性:能够有效处理不完整或噪声数据,对于拼写错误具有很好的容忍度,提供更为人性化的检索体验。
- 适用性广:适用于多种查询场景,不受限于特定的查询场景,能够灵活应对各种复杂的检索需求,特别是当用户不确定具体查询词时。
模糊检索技术也存在一些缺点,比如:
- 精度较低:相比向量召回,模糊检索在追求高容错性的同时,可能会牺牲一部分检索的精度和效率。
- 过度泛化:宽松的匹配策略有时会导致检索结果过于泛化,使得用户需要在更多的信息中筛选出真正需要的内容。
融合技术:优势与实施
融合技术旨在结合向量召回的高精度和模糊检索的高容错性,取长补短,提高检索的准确性和鲁棒性。融合技术结合两种技术优势,可以在保持高精确度的同时处理更多类型的查询。同时也能够更好地适应各种数据环境,更好地处理复杂和多样化的数据,即使在数据质量不佳的情况下也能提供稳定的表现。
融合技术的两大核心为:
- 归一化:确保不同来源的分数具有可比性,便于后续处理。
- 熵计算:衡量结果分布的不确定性,为调整结果权重提供依据。
融合技术的实施步骤主要如下:
- 关键词提取:从用户提问中提取关键词,为后续处理提供基础。
- 向量表示:利用预训练的模型将文本转换为向量形式。
- 向量库检索:在构建的向量库中进行相似度匹配,快速召回候选结果。
- 模糊搜索过滤:使用模糊检索技术对向量召回的结果进行过滤,去除不相关项。
- 结果融合:通过归一化和熵计算等方法对两种召回结果进行加权融合,形成最终的推荐列表。
实施案例:基于云器 Lakehouse 实现融合检索、进行合同文档的 AI 辅助审查
案例概述
云器科技推出的云器 Lakehouse 是一款下一代“多云、一体化”数据平台。作为湖仓一体产品,云器 Lakehouse 通过 Open Lakehouse 架构实现对多源数据的统一管理和链接,为 AI 应用提供开放、灵活的数据访问。这种统一的数据湖策略有助于最大化 Data+AI 应用生态,支持企业在 AI 驱动的数据分析和模型训练中实现创新。在合同文档审查的落地实现案例中,我们基于云器 Lakehouse 提供的产品能力,采用向量召回和模糊检索技术来辅助识别合同中的关键信息,并对其进行完整性和一致性的检查。通过本案例可以了解如何将先进的信息检索技术应用于实际业务场景,来大幅提高工作效率和准确性。
主要处理流程
- PDF文档预处理:首先使用PDFPlumberLoader从合同PDF文档中提取文本内容,并利用RecursiveCharacterTextSplitter进行文本分割,以适应后续处理流程。
- 数据流处理:通过langchain库中的组件,将分割后的文本数据流进行处理,包括清洗、格式化,并准备进行向量化处理。
- 向量化与索引构建:文本数据通过Embedding技术转换为向量形式,并存储在Lakehouse中,构建倒排索引以便于快速检索。
- 检索与重排:利用LLM Ops和Xinference平台,对合同文本进行向量召回和模糊检索,并通过bge-reranker-large模型对检索结果进行重排,以优化检索结果的相关性。
- 评审意见内容生成:使用llama3:8b-instruct-fp16模型,根据检索到的合同内容生成审查意见和缺失信息的提示。
使用到的关键组件
- PDF文件处理:langchain_community.document_loaders、langchain.text_splitter
- 数据湖:云器Lakehouse,存储和管理PDF文件
- 数据仓库:云器Lakehouse,存储和管理Table
- 文本Search:云器Lakehouse倒排索引
- 向量检索:云器Lakehouse Vector
- LLM Ops:Ollama、Xinference
- Embedding模型:snowflake_arctic_embed on Ollama、bge_m3 on Xinference
- Rerank模型:bge-reranker-large on Xinference
- Generate模型:llama3:8b-instruct-fp16 on Ollama、llama-3-instruct-70 on Xinference
方案细节
Dataflow
- PDFPlumberLoader->split_documents->Dataframe->Bulkload into Lakehouse->Embedding->Retrieve->Rerank->Generate
创建云器Lakehouse Volume&Table
pip install -q clickzetta-sqlalchemy -i https://pypi.tuna.tsinghua.edu.cn/simple
from clickzetta import connect
from clickzetta.bulkload.bulkload_enums import BulkLoadOptions,BulkLoadOperation,BulkLoadCommitOptions
import json,requests
import os
from datetime import datetime
conn = connect(
username='**********',
password='**********',
service='uat-api.clickzetta.com',
instance='jnsxwfyr',
workspace='ql_ws',
schema='vector_test',
vcluster='default'
)
# 创建游标对象
cursor = conn.cursor()
Volume,数据湖存储PDF的位置
datalake_sql = """
-- CREATE Datalake Volumes
CREATE EXTERNAL VOLUME if not exists contract_volume
LOCATION 'oss://***/pdf/合同数据'
USING connection hz_qiliang_conn_ak -- storage Connection
DIRECTORY = (
enable = TRUE --
)
recursive = TRUE; --
"""
alter_datalake_sql = """
alter volume contract_volume refresh;
"""
# 执行 SQL 查询
cursor.execute(alter_datalake_sql)
cursor.execute("select * from directory(volume contract_volume)")
results = cursor.fetchall()
for row in results:
print(row)
fadada_pdf_text_chunk_staging_sql = """
CREATE TABLE IF NOT EXISTS fadada_pdf_text_chunk_staging (
metadata_source string,
metadata_file_path string,
metadata_page int,
metadata_total_pages int,
metadata_Producer string,
metadata_Creator string,
metadata_CreationDate string,
metadata_ModDate string,
page_content string,
bge_m3_embedding_page_content string,
page_metadata string,
created_at timestamp,
);
"""
fadada_pdf_text_chunk_sql = """
CREATE TABLE IF NOT EXISTS fadada_pdf_text_chunk (
metadata_source string,
metadata_file_path string,
metadata_page int,
metadata_total_pages int,
metadata_Producer string,
metadata_Creator string,
metadata_CreationDate string,
metadata_ModDate string,
page_content string,
page_content_embedding_snowflake_arctic_embed_1024 ARRAY < float >, --存储向量数据
page_content_embedding_bge_m3_1024 ARRAY < float >, --存储向量数据
page_metadata string,
created_at timestamp,
INDEX snowflake_arctic_embedding_idx_page_content (page_content_embedding_snowflake_arctic_embed_1024) USING vector, -- create vector index
INDEX bge_m3_embedding_idx_page_content (page_content_embedding_bge_m3_1024) USING vector -- create vector index
) USING parquet;
--create index inverted index of: page_content
CREATE inverted INDEX IF NOT EXISTS inverted_index_page_content ON TABLE fadada_pdf_text_chunk (page_content) PROPERTIES ('analyzer' = 'unicode');
"""
# 执行 SQL 查询
cursor.execute(fadada_pdf_text_chunk_staging_sql)
cursor.execute(fadada_pdf_text_chunk_sql)
提取PDF文档的文本内容并对文本进行Chunk
pip install -q langchain -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install -q langchain_community -i https://pypi.tuna.tsinghua.edu.cn/simple
import pandas as pd
import os
chunk_size=512
chunk_overlap=0
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
df = pd.DataFrame(columns=['page_content', 'metadata'])
# PDF文件存储在云器Lakehouse数据湖Volume的方式
fetch_pdf_url_sql = """
-- Fetch the pdfs in volume
SELECT relative_path,
replace(
GET_PRESIGNED_URL (volume contract_volume, relative_path, 3600),
"-internal",
""
) AS pre_signed_url
FROM DIRECTORY (volume contract_volume);
"""
cursor.execute(fetch_pdf_url_sql)
results = cursor.fetchall()
# 遍历数据湖Volume下的所有.pdf文件
def extract_text_from_pdf_on_vlume(df):
i=0
for row in results:
if row[0].endswith(".pdf"):
pdf_path = row[1]
print(row[0],row[1])
# 加载PDF文件
try:
# 尝试加载 PDF 文件
documents = PDFPlumberLoader(pdf_path).load()
except Exception as e:
print(f"An error occurred: {e}")
pass
# 对每一个pdf文件的documents进行chunk
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
texts = text_splitter.split_documents(documents)
# 遍历texts列表
for text in texts:
# 提取page_content和metadata
page_content = text.page_content
metadata = text.metadata
# 创建一个临时DataFrame来存储当前文本块的信息
temp_df = pd.DataFrame({'page_content': [page_content], 'metadata': [metadata]})
# 使用concat而不是append
df = pd.concat([df, temp_df], ignore_index=True)
return df
df = extract_text_from_pdf_on_vlume(df)
df.head(5)
用不同的Embedding模型对文本数据向量化
# ollma管理的模型
def get_embeddings_ollam(_prompt):
url = 'http://106.120.**.***:11434/api/embeddings'
data = {
"model": "snowflake-arctic-embed",
"prompt": f'{_prompt}'
}
# 发送POST请求
response = requests.post(url, data=json.dumps(data)).json()
return response["embedding"]
# xinference管理的模型
from xinference.client import Client
def get_embeddings_xinference(_prompt):
client = Client("http://host.docker.internal:9998")
model = client.get_model("bge-m3")
input = _prompt
embedding = model.create_embedding(input)
embedding_value = embedding['data'][0]['embedding']
return embedding_value
批量上传数据到云器Lakehouse
if bulkload_stream:
print("Successfully connected to the ClickZetta Lakehouse")
writer = bulkload_stream.open_writer(0)
for index, row in df.iterrows():
# 将DataFrame的行转换为JSON格式
row_json = json.dumps({
'page_content': row['page_content'],
'metadata': row['metadata']
})
bulk_row = writer.create_row()
page_content = row['page_content'].replace(' ', '')
page_content = row['page_content'].replace('本文档仅供测试用途', '')
bulk_row.set_value('page_content', page_content)
bulk_row.set_value('page_metadata', str(row['metadata']))
bulk_row.set_value('created_at', datetime.now())
for key, value in row['metadata'].items():
bulk_row.set_value("metadata_"+key.lower(), value)
filename = os.path.basename(row['metadata']['source'])
bulk_row.set_value("metadata_source", filename)
page_content_embedding = get_embeddings_xinference(page_content)
bulk_row.set_value("bge_m3_embedding_page_content",str(page_content_embedding))
writer.write(bulk_row)
# 关闭writer并提交数据
writer.close()
bulkload_stream.commit()
insert_overwrite_fadada_pdf_text_chunk_sql = """
INSERT OVERWRITE fadada_pdf_text_chunk
SELECT metadata_source,
metadata_file_path,
metadata_page,
metadata_total_pages,
metadata_producer,
metadata_creator,
metadata_creationDate,
metadata_modDate,
page_content,
cast(
vector_test.get_embeddings (page_content) AS ARRAY < float >
) AS page_content_embedding_snowflake_arctic_embed_1024,
cast(split(trim(trim(bge_m3_embedding_page_content, ']'),'['),',') as array<float>) AS page_content_embedding_bge_m3_1024,
page_metadata string,
created_at
FROM fadada_pdf_text_chunk_staging;
"""
my_param = {
'hints': {
'cz.sql.remote.udf.enabled': 'TRUE'
}
}
# 执行 SQL 查询
cursor.execute(insert_overwrite_fadada_pdf_text_chunk_sql,parameters=my_param)
融合检索内容(向量检索、模糊检索和精准搜索)
search_content = '临汾**科技有限公司'
search_content_embedding = 'ARRAY(' + str(get_embeddings_xinference(search_content)).replace("[",'').replace("]",'') + ')'
# --通过向量检索与标量检索,搜索合同文本内容
search_sql = f"""
WITH prompt_vector AS (
SELECT "prompt_vector_1024" AS prompt_type,
page_content,
COSINE_DISTANCE (
page_content_embedding_bge_m3_1024,
{search_content_embedding}
) AS distance,
metadata_source,
metadata_total_pages
FROM fadada_pdf_text_chunk
ORDER BY distance
LIMIT 5
),
prompt_scalar_match_all AS (
SELECT "prompt_scalar_match_all" AS prompt_type,
page_content,
0 AS distance,
metadata_source,
metadata_total_pages
FROM fadada_pdf_text_chunk
WHERE match_all (
page_content,
'临汾**科技有限公司',
map("analyzer", "unicode")
)
LIMIT 5
),
prompt_scalar_match_any AS (
SELECT "prompt_scalar_match_any" AS prompt_type,
page_content,
0.5 AS distance,
metadata_source,
metadata_total_pages
FROM fadada_pdf_text_chunk
WHERE match_any (
page_content,
'临汾**科技有限公司',
map("analyzer", "unicode")
)
LIMIT 5
)
SELECT * FROM prompt_vector
UNION ALL
SELECT * FROM prompt_scalar_match_all
UNION ALL
SELECT * FROM prompt_scalar_match_any;
"""
my_param = {
'hints': {
'cz.sql.remote.udf.enabled': 'TRUE'
}
}
# 执行 SQL 查询
cursor.execute(search_sql,parameters=my_param)
# 获取查询结果
results = cursor.fetchall()
for row in results:
print(row)
对检索出的结果进行重新排序
page_contents = [row[1] for row in results]
from xinference.client import Client
client = Client("http://host.docker.internal:9998")
model = client.get_model("bge-reranker-large")
query = search_content
corpus = page_contents
rerank_list = model.rerank(corpus, query)
rerank_list
rerank_list['results']
合同内容审查意见生成
import requests
input = '"'+ rerank_list['results'][0]['document'] + ' 上述合同文本里缺失了哪些关键信息?' + '"'
data = { "model": "llama3:8b-instruct-fp16", "prompt": input, "stream": False}
# 请求 ollamaresponse = requests.post('http://host.docker.internal:11434/api/generate', json=data)
# 解析 JSON 数据data = json.loads(response.text)
# 提取 'response' 字段的值response_value = data['response']print(response_value)
返回结果
After reviewing the contract text, I've identified some key information that is missing:
1. **统一社会信用代码/身份证号** (Unified Social Credit Code/ID Number) for both parties (甲方 and 乙方).
2. **法定代表人** (Legal Representative) of 甲方.
3. **委托代理人** (Authorized Representative) of 甲方, including their job title and contact information.
4. **地址** (Address) of ** (乙方), which is only partially provided as "**市**区黄河五路**号**层-**" without the complete address.
5. **法定代表人** (Legal Representative) of 乙方.
6. **委托代理人** (Authorized Representative) of 乙方, including their job title and contact information.
Additionally, some clauses may be incomplete or lacking specific details, such as:
1. The terms and conditions for rent payment, late fees, and penalties.
2. The scope of responsibilities and liabilities for both parties.
3. The process for resolving disputes or conflicts.
4. Any specific regulations or restrictions on the use of the premises.
Please note that this is a test contract document, so some information might be intentionally omitted or incomplete.
方案成果
在本案例中,通过向量召回技术,系统得以在合同文本与标准模板之间迅速找到匹配项,高效地识别出任何潜在的不一致性,为合同审查工作提供了强大的语义比对工具。同时,模糊检索技术的应用增强了系统对不完整或含噪声的合同文本的容错处理能力,确保了审查过程的全面性和深度,即使在信息缺失或不精确的情况下也能保持审查的连续性和有效性。此外,融合技术的运用巧妙地结合了向量召回的精确性和模糊检索的鲁棒性,进一步提升了合同审查的准确性和效率。最终的合同审查,系统能够准确识别出合同中的关键信息,包括双方的统一社会信用代码/身份证号码、法定代表人、授权代表及其职位和联系信息等。针对地址信息不完整或条款细节缺失的情况,系统会给予明确提示,指导用户补充相关内容。这种技术的结合优化了审查流程,提高了工作效率,确保了审查结果的高质量和高可靠性。
结语:融合技术开启数据信息检索新篇章
通过本文所介绍案例可以看到,新一代融合的Lakehouse数据平台正在展现巨大的潜力,以向量召回、模糊检索及其融合技术为例,融合方案不仅为信息检索领域带来了全新的解决思路,更在实践中证明了云上湖仓平台的创新潜力和其卓越的性能表现:
- 提升检索精度:融合技术显著提高了信息检索的准确性,使得用户能够更快速、更精准地找到所需信息。
- 增强系统鲁棒性:通过结合不同检索方法的优势,融合技术大大提升了系统的稳定性和可靠性,能够更好地应对各种复杂的检索场景。
- 优化用户体验:更准确、更快速的检索结果直接转化为更好的用户体验,这对于提高工作效率和用户满意度至关重要。
随着人工智能技术的不断进步,我们有理由相信,类似的融合技术将在未来的企业信息管理任务中发挥更加重要的作用。它有望为企业带来更加精准、高效的信息服务,进一步提升数据利用的效率和价值。
云器Lakehosue
云器 Lakehouse 作为人工智能时代新一代 Data+AI 数据平台的领军者,成功实现了对多种来源数据的统一管理和连接,为 AI 应用提供了开放、灵活的数据访问。在 AI-Ready Data 方面,云器 Lakehouse 积极进取,采用了先进的技术架构并构建了丰富的产品功能,可以帮助您灵活应对复杂场景的数据分析洞察需求:
-
一体化湖仓架构:云器 Lakehouse 的架构设计使其能够在一体化湖仓架构上运行。这得益于其创新的数据管理和治理层,以及与云基础设施的紧密结合。它采用了如 Delta Lake 和 Iceberg 等湖格式,以提供数据管理特性和高效的访问性能。同时,云器 Lakehouse 支持 AI 和 BI 等不同场景下多样化的数据分析和计算需求。
-
批处理、流处理和交互式分析的统一:云器 Lakehouse 致力于实现批处理、流处理和交互式分析三种分析模式的统一,为企业提供了一个简单易用、高性能、低成本的数据平台。利用 Single-Engine 引擎,云器 Lakehouse 能够支持批处理、流处理和交互式分析等多种数据分析场景。这不仅提高了 BI 分析的效率,还通过实时数据处理能力,确保了数据的即时更新和分析结果的实时性,满足企业对数据时效性的高要求。
-
AI 智能化应用:云器 Lakehouse 将 AI 技术深度融合到数据平台中,通过自动化的数据管理、智能优化查询性能和资源分配,实现了数据平台的自我优化和智能化管理。这种智能化应用不仅提高了数据处理的效率,也为企业节约了成本。云器 Lakehouse 平台中的 DataGPT 产品,可大幅提升数据处理和分析的能力,尤其是在自然语言处理和知识提取方面,可以帮助企业用户更加高效地从大量文本数据中获取洞察力,并支持更复杂的数据分析和业务决策。
欢迎点击上方联系我们,了解更多案例详情!