AI 文摘

如何使用LangChain构建带有自查询检索器的RAG系统





作者: 知觉之门 来源: 知觉之门

RAG + 元数据过滤 = 出色的电影推荐

最近,我在浏览 Max 寻找电影时,突然想到:为什么不能用自然语言来查询电影,而不仅仅是通过标题或演员?例如,为什么不能在 Max、Netflix 或 Hulu 的搜索栏中输入以下查询:

  • 查找英语剧情片,时长不到2小时,有宠物。

  • 推荐一部僵尸电影,但确保它们很有趣。

  • 我喜欢“瞬息全宇宙”。给我一个类似的电影,但更黑暗。

这种方法的优点远远超出了自然方式的搜索电影。此方法还保护了用户的隐私。不是挖掘用户的操作、喜欢和不喜欢来喂养推荐系统,这个系统根本不需要用户数据。唯一需要的就是查询。

所以我构建了电影搜索。这是一个基于 RAG 的系统,它接收用户的查询,对其进行嵌入,并进行相似度搜索以找到类似的电影。但它超越了普通的 RAG。这个系统使用了一种叫做自查询检索器 的东西。这允许在进行相似度搜索之前,根据电影的元数据进行过滤。因此,如果用户有一个查询,例如“推荐 1980 年后制作的恐怖电影,其中有很多爆炸场面”,则搜索将首先过滤掉所有不是“1980 年后制作的恐怖电影”的电影,然后再进行相似度搜索以找到“有很多爆炸场面”的电影。

在本文中,我将提供一个如何构建该系统的高级概述。如果您想深入了解,可以查看上面的链接。

让我们开始。

检索数据

该项目的数据来自电影数据库(TMDB),获得了所有者的许可。他们的API很容易使用,维护良好且没有严格的速率限制。我从他们的API中提取了以下电影属性:

  • 标题

  • 运行时间(分钟)

  • 语言

  • 概述

  • 发布年份

  • 类型

  • 描述电影的关键字

  • 演员

  • 导演

  • 流式观看地点

  • 购买地点

  • 租赁地点

  • 生产公司列表

以下是使用 TMDB API 和 Python的 requests 库从TMDB中提取数据的一些代码:

def get_data(API_key, Movie_ID, max_retries=5):    
    """    
    Function to pull details of your film of interest in JSON format.  
  
    parameters:    
    API_key (str): Your API key for TMBD    
    Movie_ID (str): TMDB id for film of interest  
  
    returns:    
    dict: JSON formatted dictionary containing all details of your film of    
    interest    
    """  
  
    query = 'https://api.themoviedb.org/3/movie/' + Movie_ID + \    
        '?api_key='+API_key + '&append_to_response=keywords,' + \    
            'watch/providers,credits'    
    response = requests.get(query)    
    for i in range(max_retries):    
        if response.status_code == 429:    
                
            print(    
                f"Request limit reached. Waiting and retrying ({i+1}/{    
                    max_retries})")    
            time.sleep(2** i)      
        else:    
            dict = response.json()    
            return dict  

请注意查询需要电影ID(也来自TMDB),以及append_to_response ,它允许我pull多种数据,例如关键字、观看提供商、电影credit(导演和演员)以及基本电影信息。此外,还有一些基本的 Scaffold 代码,以防止达到速率限制(尽管从未观察到)。

然后,我们需要解析JSON响应。以下是一个代码片段,展示了如何解析演员和导演列表:

credits = dict['credits']    
    actor_list, director_list = [], []  
  
    
cast = credits['cast']    
NUM_ACTORS = 5    
for member in cast[:NUM_ACTORS]:    
    actor_list.append(member["name"])  
  
    
crew = credits['crew']    
for member in crew:    
    if member['job'] == 'Director':    
        director_list.append(member["name"])  
  
actor_str = ', '.join(list(set(actor_list)))    
director_str = ', '.join(list(set(director_list)))  

请注意,我将演员数量限制为电影中的前五名。我还必须指定我只对导演感兴趣,因为响应中还包括其他类型的剧组成员,例如剪辑师、服装设计师等。

所有这些数据随后被编译成 CSV 文件。上面列出的每个属性都成为一列,每一行现在代表一部特定的电影。以下是从2008_movie_collection_data.csv 文件中截取的电影片段,该文件是通过编程创建的。对于这个项目,我获得了从 1920 年到 2023 年的大约 100 部顶级电影。

[此处插入电影数据片段]

难以置信,我仍然没有看过功夫熊猫。也许我需要在这个项目后观看。

将文档上传到 Pinecone

接下来,我需要将 CSV 数据上传到 Pinecone。通常在 RAG 系统中,分块处理非常重要,但是在这里,每个“文档”(CSV 文件的一行)都相当短,因此块处理并不重要。我首先需要将每个 CSV文件转换为 LangChain 文档,然后指定哪些字段应该是主要内容,哪些字段应该是元数据。

以下是构建这些文档的代码片段:

loader = DIRECTORYLoader(    
    path="./data",    
    glob="*.csv",    
    loader_cls=CSVLoader,    
    show_progress=True)  
  
docs = loader.load()  
  
metadata_field_info = [    
    AttributeInfo(    
        name="标题", description="电影的标题", type="string"),    
    AttributeInfo(name="运行时间(分钟)",    
                  description="电影的运行时间(分钟)", type="integer"),    
    AttributeInfo(name="语言",    
                  description="电影的语言", type="string"),    
    ...    
]  
  
for doc in docs:    
        
    page_content_dict = dict(line.split(": ", 1)    
                             for line in doc.page_content.split("\n") if ": " in line)  
  
        doc.page_content = '概述:' + page_content_dict.get(    
        '概述') + '. 关键字:' + page_content_dict.get('关键字')    
    doc.metadata = {field.name: page_content_dict.get(    
        field.name) for field in metadata_field_info}  
  
        
    for field in fields_to_convert_list:    
        convert_to_list(doc, field)        
  
        
    for field in fields_to_convert_int:    
        convert_to_int(doc, field)  

DirectoryLoader 来自 LangChain ,负责加载所有 CSV 文件到文档中。然后,我需要指定什么应该是 page_content ,什么应该是 metadata 。这是一个重要的决策。page_content 将被嵌入并在检索阶段用于相似度搜索。metadata 将仅用于在进行相似度搜索之前进行过滤。 我决定将“概述”和“关键字”属性嵌入,并将其余属性作为元数据。进一步调整是否也应该将“标题”包含在 page_content 中,但我发现这种配置适合大多数用户查询。

然后,将文档上传到 Pinecone。这个过程相当简单:

PINECONE_KEY, PINECONE_INDEX_NAME = os.getenv(    
    'PINECONE_API_KEY'), os.getenv('PINECONE_INDEX_NAME')  
  
pc = Pinecone(api_key=PINECONE_KEY)  
  
    
pc.create_index(    
    name=PINECONE_INDEX_NAME,    
    dimension=1536,    
    metric="cosine",    
    spec=PodSpec(    
        environment="gcp-starter"    
    )    
)  
  
    
pc_index = pc.Index(PINECONE_INDEX_NAME)    
print(pc_index.describe_index_stats())  
  
embeddings = OpenAIEmbeddings(model='text-embedding-ada-002')  
  
vectorstore = PineconeVectorStore(    
    pc_index, embeddings    
)  
  
    
namespace = f"pinecone/{PINECONE_INDEX_NAME}"    
record_manager = SQLRecordManager(    
    namespace, db_url="sqlite:///record_manager_cache.sql"    
)  
  
record_manager.create_schema()  
  
    
index(docs, record_manager, vectorstore,    
      cleanup="full", source_id_key="Website")  

以下是该代码的一些要点:

  • 使用 SQLRecordManager 确保了在多次运行代码时,不会上传重复的文档到 Pinecone。如果文档被修改,只有该文档将被修改 vector 存储中。

  • 我们使用的是 OpenAI 的经典 text-embedding-ada-002 嵌入模型。

创建自查询检索器

自查询检索器将允许我们通过提前定义的元数据来过滤在 RAG 期间检索到的电影。这将大大提高我们的电影推荐器的有用性。

在选择矢量存储时,一个重要的考虑因素是确保它支持元数据过滤,因为不是所有矢量存储都支持。LangChain 提供了一份支持自查询检索的数据库列表。另一个重要的考虑因素是什么类型的比较器允许每个矢量存储。比较器是我们根据元数据过滤的方法。例如,我们可以使用 eq 比较器来确保我们的电影属于科幻电影genre:eq(‘Genre’, ‘Science Fiction’) 。不是所有矢量存储都允许所有比较器。例如,查看 Chroma 中的允许比较器 以及 Pinecone 中的比较器如何不同。

除了告诉模型允许的比较器外,我们还可以为模型提供用户查询和相应过滤器的示例。这被称为少样本学习 ,它对于指导模型非常有价值。

看看以下两个用户查询:

  • 《推荐一些由 Yorgos Lanthimos 导演的电影。》

  • 《与 Yorgos Lanthimos 电影相似的电影。》

我的元数据过滤模型可以轻松地为这两个示例写出相同的过滤器查询,即使我想将它们处理得不同。第一个应该只产生由 Lanthimos 导演的电影,而第二个应该产生与 Lanthimos 电影相同的电影。我通过提供模型的示例来确保这种行为。语言模型的美丽之处在于它们可以使用其“推理”能力和世界知识来自这些少镜头示例中推断出其他用户查询。

document_content_description = "电影的简要概述,along with keywords"  
  
            
        allowed_comparators = [    
            "$eq",      
            "$ne",      
            "$gt",      
            "$gte",      
            "$lt",      
            "$lte",      
            "$in",      
            "$nin",      
            "$exists",     
        ]  
  
        examples = [    
            (    
                "Recommend some films by Yorgos Lanthimos.",    
                {    
                    "query": "Yorgos Lanthimos",    
                    "filter": 'in("Directors", ["Yorgos Lanthimos"])',    
                },    
            ),    
            (    
                "Films similar to Yorgos Lanthmios movies.",    
                {    
                    "query": "Dark comedy, absurd, Greek Weird Wave",    
                    "filter": 'NO_FILTER',    
                },    
            ),    
            ...    
        ]  
  
        metadata_field_info = [    
            AttributeInfo(    
                name="Title", description="电影的标题", type="string"),    
            AttributeInfo(name="Runtime (minutes)",    
                          description="电影的分钟运行时间", type="integer"),    
            AttributeInfo(name="Language",    
                          description="电影的语言", type="string"),    
            ...    
        ]  
  
        constructor_prompt = get_query_constructor_prompt(    
            document_content_description,    
            metadata_field_info,    
            allowed_comparators=allowed_comparators,    
            examples=examples,    
        )  
  
        output_parser = StructuredQueryOutputParser.from_components()    
        query_constructor = constructor_prompt | query_model | output_parser  
  
        retriever = SelfQueryRetriever(    
            query_constructor=query_constructor,    
            vectorstore=vectorstore,    
            structured_query_translator=PineconeTranslator(),    
            search_kwargs={'k': 10}    
        )  

此外,模型还需要了解每个元数据字段的描述。这有助于它理解哪些元数据过滤是可能的。

最后,我们构建链条。这里的 query_model 是使用 OpenAI API 的 GPT-4 Turbo 实例。我建议使用 GPT-4 而不是 3.5 来编写这些元数据过滤器查询,因为这是一个关键步骤,3.5 在这里经常出错。search_kwargs={‘k’:10} 告诉检索器根据用户查询提取最相似的前 10 部电影。

创建聊天模型

终于,在构建自查询检索器之后,我们可以在其基础上构建标准的 RAG 模型。我们首先定义聊天模型。这是我所谓的_summary 模型,因为它接收上下文(检索的电影+系统消息)并生成每个推荐的摘要。如果您想降低成本,这个模型可以使用 GPT-3.5 Turbo,如果您想要最好的结果,可以使用 GPT-4 Turbo。

在系统消息中,我告诉机器人它的目标,并提供了一系列推荐和限制,其中最重要的是不要推荐 自查询检索器 没有提供的电影 。在测试中,当用户的查询没有从数据库中检索到任何电影时,我遇到了问题。例如,《“推荐一些恐怖电影,主演马特·达蒙,由韦斯·安德森执导,制作于1980年之前”》,这将导致自查询检索器检索不到电影(因为这个电影不存在)。在没有电影数据的情况下,模型将使用自己的(错误的)记忆尝试推荐一些电影。这不是好的行为。我不想 Netflix 推荐一个不在数据库中的电影。以下系统消息能够阻止这种行为。我注意到 GPT-4 比 GPT-3.5 更好地遵循指令,这是预料之中的。

chat_model = ChatOpenAI(  
    model=SUMMARY_MODEL_NAME,  
    temperature=0,  
    streaming=True,  
)  



 



prompt = ChatPromptTemplate.from_messages(  
    [  
        (  
            'system',  
            """  
            您的目标是根据用户的查询和检索的上下文推荐电影。如果检索的电影不相关,忽略它。如果您的上下文为空或没有相关的电影,不要推荐电影,告诉用户您找不到相关的电影。目标是推荐三到五个电影,只要这些电影是相关的。您不能推荐超过五个电影。您的推荐应该是相关的、原创的且至少有二三句话长。  
  
                        您不能推荐不在上下文中的电影。  
  
                # 输出模板  
                -**电影标题**:  
                    - 播放时间:  
                    - 发布年份:  
                    - 流媒体:  
                    - (您推荐这个电影的理由)  
  
                            问题:{question}  
                            上下文:{context}  
            """  
        ),  
    ]  
)  



 



def format_docs(docs):  
    return "\n\n".join(f"{doc.page_content}\n\nMeta{doc.metadata}" for doc in docs)  



 



rag_chain_from_docs = (  
    RunnablePassthrough.assign(  
        context=(lambda x: format_docs(x["context"]))  
    )  
    | prompt  
    | chat_model  
    | StrOutputParser()  
)  
  
rag_chain_with_source = RunnableParallel(  
    {"context": retriever, "question": RunnablePassthrough()}  
).assign(answer=rag_chain_from_docs)  

format_docs 用于格式化呈现给模型的信息,以便于理解和解析。我们向模型提供page_content (概述和关键字)以及metadata (所有其他电影属性);任何它可能需要更好地向用户推荐电影的内容。

rag_chain_from_docs 是一个链,它将检索到的文档格式化,输入格式化后的文档到模型的上下文,然后使用模型回答问题。最后,我们创建 rag_chain_with_source ,这是一个 RunnableParallel ,它并行运行两个操作:自查询检索器去检索相似文档,而查询只是通过 RunnablePassthrough() 传递给模型的。然后,我们将并行组件的结果组合起来,并使用 rag_chain_from_docs 生成答案。在这里,source 指的是检索器,它访问所有“源”文档。

因为我想将答案流式呈现给用户,所以我们使用以下代码:

for chunk in rag_chain_with_source.stream(query):  
    for key in chunk:  
        if key == 'answer':  
            yield chunk[key]  

演示

现在到了有趣的部分:与模型玩耍。如前所述,Streamlit 用于创建前端和托管应用程序。我不会在这里讨论 UI 代码;请查看原始代码以了解实现细节。它非常直接,并且 Streamlit 网站上有许多其他示例。

**电影搜索 UI。作者。

有几个可以使用的建议,但让我们尝试自己的查询:

示例查询和模型响应。作者。

在幕后,self-querying retriever 确保了.filtering 掉了非法语电影。然后,它执行了“成长故事”的相似搜索,结果是十个电影。最后,summarizer bot 选择了五个电影进行推荐。请注意,建议的电影日期从 1959 年到 2012 年。为方便起见,我确保机器人包括电影的播放时间、发行年份、流媒体提供商和机器人手工制作的推荐。

(旁注:如果您还没有观看[__The 400 Blows],请停止您正在做的事情,并观看[__The 400 Blows]。)

大语言模型中的负面特征,如其响应的非确定性,现在变成了正面特征。请模型同一个问题两次,您可能会获得略有不同的建议。

请注意当前实现的一些限制:

  • 没有保存推荐。用户可能想要重新访问旧的建议。

  • 手动更新电影数据库的原始数据。自动化并每周更新将是一个好主意。

  • self-querying Retrieval 的元数据过滤问题。例如,查询“贝恩·阿弗莱克 фільм”可能会出现问题。这可能意味着贝恩·阿弗莱克明星的电影或贝恩·阿弗莱克执导的电影。这是一个需要查询澄清的问题。

可能的改进项目可能是执行文档的重新排名或创建一个多转换的聊天模型,而不是仅仅是一个 QA 机器人。还可以创建一个agent recommender,它可以在 query 不明确时提示用户。

更多AI工具,参考Github-AiBard123国内AiBard123

可关注我们的公众号:每天AI新工具