RunnableParallel:操作输入和输出
RunnableParallel 对于操作一个 Runnable 的输出以匹配序列中下一个 Runnable 的输入格式非常有用。
这里提示的输入应该是一个带有“context”和“question”键的地图。用户输入只是问题。因此,我们需要使用检索器获取上下文,并通过“question”键下的用户输入.
%pip install --upgrade --quiet langchain langchain-openai
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
model = ChatOpenAI()
retrieval_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| model
| StrOutputParser()
)
retrieval_chain.invoke("where did harrison work?")
'Harrison worked at Kensho.'
::: tip 提示 请注意,当将 RunnableParallel 与另一个 Runnable 组合时,我们甚至不需要将字典包装在 RunnableParallel 类中 - 类型转换已为我们处理。在链的上下文中,这些是等效的: :::
{"context": retriever, "question": RunnablePassthrough()}
RunnableParallel({"context": retriever, "question": RunnablePassthrough()})
RunnableParallel(context=retriever, question=RunnablePassthrough())
使用 itemgetter 作为简写
请注意,与 RunnableParallel 结合时,您可以使用 Python 的 itemgetter 作为简写从地图中提取数据。您可以在 Python 文档中找到有关 itemgetter 的更多信息。
在下面的示例中,我们使用 itemgetter 从映射中提取特定键:
from operator import itemgetter
from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
vectorstore = FAISS.from_texts(
["harrison worked at kensho"], embedding=OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
template = """Answer the question based only on the following context:
{context}
Question: {question}
Answer in the following language: {language}
"""
prompt = ChatPromptTemplate.from_template(template)
chain = (
{
"context": itemgetter("question") | retriever,
"question": itemgetter("question"),
"language": itemgetter("language"),
}
| prompt
| model
| StrOutputParser()
)
chain.invoke({"question": "where did harrison work", "language": "italian"})
'Harrison ha lavorato a Kensho.'
并行化步骤
RunnableParallel(又名 RunnableMap)可以轻松并行执行多个 Runnable,并将这些 Runnable 的输出作为映射返回。
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
joke_chain = ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}") | model
)
map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
map_chain.invoke({"topic": "bear"})
{'joke': AIMessage(content="Why don't bears wear shoes?\n\nBecause they have bear feet!"),
'poem': AIMessage(content="In the wild's embrace, bear roams free,\nStrength and grace, a majestic decree.")}
并行性
RunnableParallel 对于并行运行独立进程也很有用,因为映射中的每个 Runnable 都是并行执行的。例如,我们可以看到我们之前的joke_chain、poem_chain 和map_chain 都具有大致相同的运行时间,尽管map_chain 执行了另外两个。
%%timeit
joke_chain.invoke({"topic": "bear"})
958 ms ± 402 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
poem_chain.invoke({"topic": "bear"})
1.22 s ± 508 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
map_chain.invoke({"topic": "bear"})
1.15 s ± 119 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)