AI 文摘

高级RAG检索策略之混合检索





作者: 极客与黑客之路 来源: 极客与黑客之路

古人云:兼听则明,偏信则暗 ,意思是要同时听取各方面的意见,才能正确认识事物,只相信单方面的话,必然会犯片面性的错误。在 RAG(Retrieval Augmented Generation)应用中也是如此,如果我们可以同时从多个信息源中获取信息,那么我们的检索结果会更加全面和准确。今天我们就来介绍高级 RAG 检索策略中的混合检索,并在实际操作中结合 ElaticSearch 和 Llama3 来实现混合检索的效果。

原理介绍

混合检索也叫融合检索,也叫多路召回,是指在检索过程中,同时使用多种检索方式,然后将多种检索结果进行融合,得到最终的检索结果。混合检索的优势在于可以充分利用多种检索方式的优势,弥补各种检索方式的不足,从而提高检索的准确性和效率,下面是混合检索的流程图:

  • 首先是问题查询,这一过程的设计可以简单也可以复杂,简单的做法是直接将原始查询传递给检索器,而复杂一点的做法是通过 LLM(大语言模型)为原始查询生成子查询或相似查询,然后再将生成后的查询传递给检索器

  • 然后是检索器执行检索,检索可以在同一数据源上进行不同维度的检索,比如向量检索和关键字检索,也可以是在不同数据源上进行检索,比如文档和数据库

  • 检索过程从原来一个问题变成了多个问题检索,如果串行执行这些检索,那么检索的效率会大大降低,所以我们需要并行执行多个检索 ,这样才可以保证检索的效率

  • 最后是融合检索结果,在这一过程中,我们需要对检索结果进行去重,因为在检索的多个结果中,有些结果可能是重复的,同时我们还需要对检索结果进行排序,排序方法一般采用 RRF(倒数排名融合),选出最匹配的检索结果

环境准备

为了更好地了解混合检索的原理和实现,今天我们将通过 LLM 应用框架LlamaIndex[1],结合 Meta 最新开源的模型Llama3[2]和开源搜索引擎ElasticSearch[3],来实现一个高效的混合检索系统。在 RAG 检索过程中除了需要用到 LLM 的模型外,还需要用到 Embedding 模型和 Rerank 模型,这些模型我们也统一使用本地部署的模型,这样可以更好地了解各种模型的使用和部署。

LlamaIndex 集成 Llama3

首先是进行 Llama3 的本地化部署,有多种工具可以部署 Llama3,比如 Ollama[4] 或 vllm[5],而且这些工具都提供了兼容 OpenAI 的 API 接口,vllm 的部署方式可以参考我之前的这篇文章

部署完成后,我们再看如何在 LlamaIndex 中集成 Llama3。虽然 LlamaIndex 提供了自定义 LLM[7]的功能,但继承自CustomeLLM 类来实现自定义 LLM 的方式比较复杂,需要从头实现complete 或chat 等方法。这里推荐 LlamaInex 另外一个创建自定义 LLM 的方法,即使用OpenAILike 类,这个类是对 OpenAI 类进行轻量级封装,只要有兼容 OpenAI 的 API 服务,就可以直接使用该类来获得 OpenAI LLM 的功能。

要使用OpenAILike 类,首先需要安装相关依赖包pip install llama-index-llms-openai-like ,然后使用以下代码进行集成:

from llama_index.llms.openai_like import OpenAILike  
from llama_index.core.base.llms.types import ChatMessage, MessageRole  
from llama_index.core import PromptTemplate  
  
llm = OpenAILike(  
    model="llama3",  
    api_base="you-local-llama3-api",  
    api_key="fake_key",  
    is_chat_model=True,  
)  
prompt_str = "Please generate related movies to {movie_name}"  
prompt_tmpl = PromptTemplate(prompt_str)  
response = llm.chat(  
    [  
        ChatMessage(  
            role=MessageRole.SYSTEM,  
            content="You are a helpful assistant.",  
        ),  
        ChatMessage(  
            role=MessageRole.USER,  
            content=prompt_tmpl.format(movie_name="Avengers"),  
        ),  
    ]  
)  
print(f"response: {response}")  
  
# 显示结果  
response: assistant: Here are some movie recommendations that are similar to the Avengers franchise:  
  
1.**Guardians of the Galaxy** (2014) - Another Marvel superhero team-up film, with a fun and quirky tone.  
2.**The Justice League** (2017) - A DC Comics adaptation featuring iconic superheroes like Superman, Batman, Wonder Woman, and more.  
......  
  • 在OpenAILike 对象中,参数model 为模型名称,api_base 为本地 Llama3 的 API 服务地址

  • api_key 可以随便填写,但不能不传这个参数,否则会出现连接超时的错误

  • is_chat_model 为是否是 chat 模型,因为 OpenAI 的模型分为 chat 模型和非 chat 模型

  • 然后我们使用 LLM 对象进行了一个普通的对话,结果可以正常返回

LlamaIndex 集成 ElasticSearch

在 RAG 应用中向量数据库是必不可少的一项功能,而 Elasticsearch 能够存储各种类型的数据,包括结构化和非结构化数据,并且支持全文检索和向量检索。ElasticSearch 本地环境的安装和部署可以参考我之前的这篇文章

部署完 ElasticSearch 后,还需要安装 LlamaIndex 的 Elasticsearch 依赖包pip install llama-index-vector-stores-elasticsearch ,然后使用以下代码示例就可以集成 ElasticSearch:

from llama_index.vector_stores.elasticsearch import ElasticsearchStore  
  
es = ElasticsearchStore(  
    index_name="my_index",  
    es_url="http://localhost:9200",  
)  
  • index_name 是 ElasticSearch 的索引名称,es_url 是 ElasticSearch 服务的地址

自定义 Embedding 和 Rerank 模型

在高级 RAG 的检索过程中,需要用到 Embedding 模型来对文档和问题进行向量化,然后使用 Rerank 模型对检索结果进行重排序。同样有很多工具可以部署这 2 种模型,比如TEI[9] 和 Xinference[10]等。这里我们使用 TEI 来部署这 2 种模型,TEI 和模型的部署可以参考我之前的这篇文章

Embedding 模型的启动命令如下,这里我们使用了BAAI/bge-base-en-v1.5[12]这个 Embeddings 模型,服务端口为 6006:

text-embeddings-router --model-id BAAI/bge-base-en-v1.5 --revision refs/pr/4 --port 6006  

Rerank 模型的启动命令如下,这里我们使用了BAAI/bge-reranker-base[13]这个 Rerank 模型,服务端口为 7007:

text-embeddings-router --model-id BAAI/bge-reranker-base --revision refs/pr/4 --port 7007  

多种检索方式

数据入库

在介绍检索之前,我们先来了解下 LlamaIndex 如何使用 ElasticSearch 对文档进行解析和入库,这里的测试文档还是用维基百科上的复仇者联盟[14]电影剧情,示例代码如下:

from llama_index.vector_stores.elasticsearch import ElasticsearchStore  
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, StorageContext  
from llama_index.core.node_parser import SentenceSplitter  
from llms import CustomEmbeddings  
  
store = ElasticsearchStore(  
    index_name="avengers",  
    es_url="http://localhost:9200",  
)  
documents = SimpleDirectoryReader("./data").load_data()  
node_parser = SentenceSplitter(chunk_size=256, chunk_overlap=50)  
storage_context = StorageContext.from_defaults(vector_store=store)  
embed_model = CustomEmbeddings(  
    model="BAAI/bge-base-en-v1.5", url="http://localhost:6006"  
)  
VectorStoreIndex.from_documents(  
    documents,  
    transformations=[node_parser],  
    embed_model=embed_model,  
    storage_context=storage_context,  
)  
  • 首先定义了一个 ElasticsearchStore 对象来连接 ElaticSearch 本地服务

  • 然后使用 SimpleDirectoryReader 加载本地的文档数据

  • 使用 SentenceSplitter 对文档进行分块处理,应为 TEI 的输入 Token 数最大只能 512,所以这里的 chunk_size 设置为 256,chunk_overlap 设置为 50

  • 构建 StorageContext 对象,指定向量存储为之前定义的 ElasticsearchStore 对象

  • 创建一个自定义 Embeddings 对象,使用的是 TEI 部署的 Embeddings 模型服务,这里CustomEmbeddings 的代码可以参考这篇文章中的代码

  • 最后使用 VectorStoreIndex 对象将文档数据入库

当执行完代码后,可以在 ElasticSearch 的avengers 索引中看到文档数据,如下图所示:

全文检索

数据入库后,我们再来看下如何在 LlamaIndex 中使用 Elasticsearch 进行全文检索。

全文检索是 Elasticsearch 的基本功能,有时候也叫关键字检索,是指根据关键字在文档中进行检索,支持精确匹配,同时高级功能也支持模糊匹配、同义词替换、近义词搜索等。在 LlamaIndex 中使用 Elasticsearch 进行全文检索的代码如下:

from llama_index.vector_stores.elasticsearch import AsyncBM25Strategy  
from llama_index.core import Settings  
  
text_store = ElasticsearchStore(  
    index_name="avengers",  
    es_url="http://localhost:9200",  
    retrieval_strategy=AsyncBM25Strategy(),  
)  
Settings.embed_model = embed_model  
text_index = VectorStoreIndex.from_vector_store(  
    vector_store=text_store,  
)  
text_retriever = text_index.as_retriever(similarity_top_k=2)  
  • 这里重新定义了一个 ElasticsearchStore 对象,但这次指定了检索策略为 BM25,如果要使用全文检索则必须指定这个检索策略

  • 将ElasticsearchStore 对象作为参数来创建VectorStoreIndex 对象

  • 最后通过VectorStoreIndex 对象创建全文检索的检索器,这里设置检索结果的数量为 2

BM25 是一种在信息检索领域广泛采用的排名函数,主要用于评估文档与用户查询的相关性。该算法的基本原理是将用户查询(query)分解为若干语素(qi),然后计算每个语素与搜索结果之间(document D)的相关性。通过累加这些相关性得分,BM25 最终得出查询与特定文档之间的总相关性评分。这种检索策略在现代搜索引擎中非常常见。

向量检索

我们再来了解 LlamaIndex 中如何使用 Elasticsearch 进行向量检索。

向量检索是一种基于机器学习的信息检索技术,它使用数学向量来表示文档和查询。在 LlamaIndex 中使用 Elasticsearch 进行向量检索有 2 种检索策略,分别是Dense 和Sparse ,这两种策略的区别在于向量的稠密度,Dense 检索的号码每一位都是有用的数字,就像一个充满数字的电话号码,而Sparse 检索的号码大部分都是零,只有少数几个位置有数字,就像一个电话号码大部分是零,只有几个位置有数字。如果需要更精细、更复杂的检索方法,用Dense 检索,如果需要简单快速的方法,用Sparse 检索。ElasicsearchStore 类默认的检索策略是Dense ,下面是向量检索的代码示例:

from llama_index.vector_stores.elasticsearch import AsyncDenseVectorStrategy, AsyncSparseVectorStrategy  
  
vector_store = ElasticsearchStore(  
    index_name="avengers",  
    es_url="http://localhost:9200",  
    retrieval_strategy=AsyncDenseVectorStrategy(),  
    # retrieval_strategy=AsyncSparseVectorStrategy(model_id=".elser_model_2"),  
)  
Settings.embed_model = embed_model  
vector_index = VectorStoreIndex.from_vector_store(  
    vector_store=vector_store,  
)  
vector_retriever = vector_index.as_retriever(similarity_top_k=2)  
  • 向量检索的代码和全文检索的代码类似

  • 如果是使用Dense 检索策略,可以指定retrieval_strategy=AsyncDenseVectorStrategy() ,也可以不指定retrieval_strategy 参数

  • 如果是使用Sparse 检索策略,需要指定retrieval_strategy=AsyncSparseVectorStrategy(model_id=".elser_model_2") ,这里需要额外部署 ElasticSearch 的 ELSER 模型[16] #### 混合检索

定义好了 2 种检索器后,我们再来了解如何将这些检索进行融合,在 LlamaIndex 的 ElasticsearchStore 类中提供了混合检索的方法,示例代码如下:

from llama_index.vector_stores.elasticsearch import AsyncDenseVectorStrategy  
  
vector_store = ElasticsearchStore(  
    index_name="avengers",  
    es_url="http://localhost:9200",  
    retrieval_strategy=AsyncDenseVectorStrategy(hybrid=True),  
)  
  • 这里的检索策略还是使用Dense 检索策略,但是指定了hybrid=True 参数,表示使用混合检索

设置了混合检索策略后,在融合检索结果时会自动使用 Elasicsearch 的 RRF 功能。

RRF(倒数排名融合) 是一种融合检索算法,用于结合多个检索结果列表。每个结果列表中的每个文档被分配一个分数,分数基于文档在列表中的排名位置。该算法的基本思想是,通过对多个检索器的结果进行融合,来提高检索性能。

但在 Elasticsearch 的免费版本中,这个功能是不可用 的:

因此我们需要自己实现 RRF 功能,RRF 的论文可以看这里[17],下面是 RRF 的代码实现:

from typing import List  
from llama_index.core.schema import NodeWithScore  
  
def fuse_results(results_dict, similarity_top_k: int = 2):  
    """Fuse results."""  
    k = 60.0  
    fused_scores = {}  
    text_to_node = {}  
  
    # 计算倒数排名分数  
    for nodes_with_scores in results_dict.values():  
        for rank, node_with_score in enumerate(  
            sorted(  
                nodes_with_scores, key=lambda x: x.score or 0.0, reverse=True  
            )  
        ):  
            text = node_with_score.node.get_content()  
            text_to_node[text] = node_with_score  
            if text not in fused_scores:  
                fused_scores[text] = 0.0  
            fused_scores[text] += 1.0 / (rank + k)  
  
    # 结果按分数排序  
    reranked_results = dict(  
        sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)  
    )  
  
    # 结果还原为节点集合  
    reranked_nodes: List[NodeWithScore] = []  
    for text, score in reranked_results.items():  
        reranked_nodes.append(text_to_node[text])  
        reranked_nodes[-1].score = score  
  
    return reranked_nodes[:similarity_top_k]  
  • 方法的参数results_dict 是所有检索器的检索结果集合,similarity_top_k 是最相似的结果数量

  • 假设results_dict 的值是{‘full-text’: [nodes], ‘vector’: [nodes]} ,这个方法方法的作用是将所有的检索结果节点进行融合,然后选出最相似的similarity_top_k 个节点

  • 方法开头是初始化一些变量,k 用于计算倒数排名分数,fused_scores 用于存储节点文本和融合后分数的映射,text_to_node 用于存储节点文本到节点的映射

  • 然后是计算每个节点的倒数排名分数,先将 results_dict 中的每个节点按照分数进行排序,然后计算每个节点的倒数排名分数,将结果保存到 fused_scores 中,同时将节点文本和节点的关系保存到 text_to_nodes 中

  • 接着再对 fused_scores 按照倒数排名分数进行排序,得到 reranked_results

  • 然后根据 reranked_results 将结果还原成节点集合的形式,并将节点的分数设置为融合后的分数,最终结果保存到 reranked_nodes 列表中

  • 最后返回最相似的结果,返回 reranked_nodes 列表中的前 similarity_top_k 个节点

定义好融合函数后,我们再定义一个方法来执行多个检索器,这个方法返回的结果就是融合函数的参数 results_dict ,示例代码如下:

from tqdm.asyncio import tqdm  
  
def run_queries(query, retrievers):  
    """Run query against retrievers."""  
    tasks = []  
    for i, retriever in enumerate(retrievers):  
        tasks.append(retriever.aretrieve(query))  
  
    task_results = await tqdm.gather(*tasks)  
  
    results_dict = {}  
    for i, query_result in enumerate(task_results):  
        results_dict[(query, i)] = query_result  
  
    return results_dict  
  • 方法的参数query 是原始问题,retrievers 是多个检索器的集合

  • 将问题传给每个检索器,构建异步任务列表tasks

  • 然后使用await tqdm.gather(*tasks) 来并行 执行所有的检索器,并行执行可以提高检索效率

  • 最后将检索结果保存到results_dict 中,返回results_dict

因为我们使用了异步方式进行检索,原先的CustomEmbeddings 中的方法也需要修改,示例代码如下:

+import asyncio  
  
-    def _aget_query_embedding(self, query: str) -> Embedding:  
-        return get_embedding(text=query, model=self._model, url=self._url)  
+    async def _aget_query_embedding(self, query: str) -> Embedding:  
+        loop = asyncio.get_event_loop()  
+        return await loop.run_in_executor(  
+            None, get_embedding, query, self._model, self._url  
+        )  

然后我们构建一个融合检索器来将上面定义的方法组合到一起,示例代码如下:

from typing import List  
from llama_index.core import QueryBundle  
from llama_index.core.retrievers import BaseRetriever  
from llama_index.core.schema import NodeWithScore  
import asyncio  
  
class FusionRetriever(BaseRetriever):  
    """Ensemble retriever with fusion."""  
  
    def __init__(  
        self,  
        retrievers: List[BaseRetriever],  
        similarity_top_k: int = 2,  
    ) -> None:  
        """Init params."""  
        self._retrievers = retrievers  
        self._similarity_top_k = similarity_top_k  
        super().__init__()  
  
    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:  
        """Retrieve."""  
        results = asyncio.run(  
            run_queries(query_bundle.query_str, self._retrievers)  
        )  
        final_results = fuse_results(results, similarity_top_k=self._similarity_top_k)  
        return final_results  
  • 这个融合检索器的类继承自BaseRetriever 类,重写了_retrieve 方法

  • 构造方法中的参数retrievers 是多个检索器的集合,similarity_top_k 是最相似的结果数量

  • 在_retrieve 方法中,调用了run_queries 方法来获取检索结果results

  • 然后调用了fuse_results 方法来融合检索结果并返回

我们来看融合检索器运行后的检索结果,代码示例如下:

fusion_retriever = FusionRetriever(  
    [text_retriever, vector_retriever], similarity_top_k=2  
)  
question = "Which two members of the Avengers created Ultron?"  
nodes = fusion_retriever.retrieve(question)  
for node in nodes:  
    print("-" * 50)  
    print(f"node content: {node.text[:100]}...")  
    print(f"node score: {node.score}\n")  
  
# 显示结果  
-----------------------------------------------node content: In the Eastern European country of Sokovia, the Avengers—Tony Stark, Thor, Bruce Banner, Steve Roger...  
node score: 0.03306010928961749  
  
-----------------------------------------------node content: Thor departs to consult with Dr. Erik Selvig on the apocalyptic future he saw in his hallucination, ...  
node score: 0.016666666666666666  
  • 首先定义了一个 FusionRetriever 对象,传入了全文检索器和向量检索器,同时设置了最相似的结果数量为 2

  • 然后传入了一个问题,获取检索结果

从结果中可以看到,检索结果节点返回的分数是经过 RRF 融合后的分数,分数值比较低,与原始的 Rerank 分数值不太匹配,这时我们可以使用 Rerank 模型来对检索结果进行重排序。

from llama_index.core.query_engine import RetrieverQueryEngine  
  
rerank = CustomRerank(  
    model="BAAI/bge-reranker-base", url="http://localhost:7007", top_n=2  
)  
Settings.llm = llm  
query_engine = RetrieverQueryEngine(fusion_retriever, node_postprocessors=[rerank])  
response = query_engine.query(question)  
print(f"response: {response}")  
for node in response.source_nodes:  
    print("-" * 50)  
    print(f"node content: {node.text[:100]}...")  
    print(f"node score: {node.score}\n")  
  
# 显示结果  
response: Tony Stark and Bruce Banner.  
-----------------------------------------------node content: In the Eastern European country of Sokovia, the Avengers—Tony Stark, Thor, Bruce Banner, Steve Roger...  
node score: 0.8329173  
  
-----------------------------------------------node content: Thor departs to consult with Dr. Erik Selvig on the apocalyptic future he saw in his hallucination, ...  
node score: 0.24689633  
  • CustomRerank 类是一个自定义的 Rerank 类,这个类的代码可以参考这篇文章中的代码

  • 在系统设置中设置了 LLM 模型来生成答案

  • 通过混合检索器构建查询引擎,并在node_postprocessors 参数中传入了 Rerank 模型,表示在检索结果后使用 Rerank 模型对检索结果进行重排序

  • 最后传入问题,获取检索结果

从结果中可以看到,检索结果节点返回的分数是经过 Rerank 模型重排序后的分数,分数值比较高,这样我们的混合检索系统就构建完成了。

总结

混合检索是一种在 RAG 应用中常用的检索策略,通过融合多种检索方式,可以提高检索的准确性和效率。今天我们通过 LlamaIndex 的代码实践,了解了构建混合检索系统的流程,同时也学习了如何使用 Llama3 和 ElasticSearch 来实现混合检索的效果,以及混合检索中一些常见的检索策略和排序算法。

关注我,一起学习各种人工智能和 AIGC 新技术,欢迎交流,如果你有什么想问想说的,欢迎在评论区留言。

参考:

[1]

LlamaIndex: https://www.llamaindex.ai/

[2]

Llama3: https://llama.meta.com/llama3/

[3]

ElasticSearch: https://www.elastic.co/cn/elasticsearch/

[4]

Ollama: https://ollama.com/

[5]

vllm: https://github.com/vllm-project/vllm

[7]

自定义 LLM: https://docs.llamaindex.ai/en/stable/module_guides/models/llms/usage_custom/

[9]

TEI: https://github.com/huggingface/text-embeddings-inference

[10]

Xinference: https://inference.readthedocs.io/en/latest/

[12]

BAAI/bge-base-en-v1.5: https://huggingface.co/BAAI/bge-base-en-v1.5

[13]

BAAI/bge-reranker-base: https://huggingface.co/BAAI/bge-reranker-base

[

14]

复仇者联盟: https://en.wikipedia.org/wiki/Avenger

[16]

ELSER 模型: https://www.elastic.co/guide/en/machine-learning/current/ml-nlp-elser.html

[17]

这里: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf

更多AI工具,参考Github-AiBard123国内AiBard123

可关注我们的公众号:每天AI新工具