跳到主要内容

流式 LangChain

流式对于使基于法学硕士的应用程序能够响应最终用户至关重要。

LLM、解析器、提示器、检索器和代理等重要的 LangChain 原函数实现了 LangChain Runnable Interface。

该接口提供了两种流内容的通用方法:

  1. 同步流和异步astream:流式传输的默认实现,用于流式传输链中的最终输出。
  2. async astream_events 和 async astream_log:它们提供了一种从链中传输中间步骤和最终输出的方法。

让我们来看看这两种方法,并尝试理解如何使用它们。

使用流式

所有 Runnable 对象都实现一个称为 Stream 的同步方法和一个称为 astream 的异步变体。

这些方法旨在以块的形式传输最终输出,一旦每个块可用就生成它。

仅当程序中的所有步骤都知道如何处理输入流时,流式传输才可能实现;即,一次处理一个输入块,并产生相应的输出块。

此处理的复杂性可能会有所不同,从简单的任务(例如由 LLM 生成的令牌)到更具挑战性的任务(例如在整个 JSON 完成之前流式传输 JSON 结果的部分)。

开始探索流媒体的最佳位置是法学硕士应用程序中最重要的一个组件——法学硕士本身!

LLMs 和 聊天模型

大型语言模型及其聊天变体是基于 LLM 的应用程序的主要瓶颈。

大型语言模型可能需要几秒钟才能生成对查询的完整响应。这远远慢于应用程序对最终用户的响应速度约为 200-300 毫秒的阈值。

让应用程序感觉响应更快的关键策略是显示中间进度;即,按令牌流式传输模型令牌的输出。

我们将展示使用 Anthropic 的聊天模型进行流式传输的示例。要使用该模型,您需要安装 langchain-anthropic 包。您可以使用以下命令来执行此操作:

pip install -qU langchain-anthropic
# Showing the example using anthropic, but you can use
# your favorite chat model!
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic()

chunks = []
async for chunk in model.astream("hello. tell me something about yourself"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)

 Hello|!| My| name| is| Claude|.| I|'m| an| AI| assistant| created| by| An|throp|ic| to| be| helpful|,| harmless|,| and| honest|.||

让我们检查其中一个块

chunks[0]
AIMessageChunk(content=' Hello')

我们得到了一个叫做 AIMessageChunk 的东西。该块代表 AIMessage 的一部分。

消息块在设计上是可加的——人们可以简单地将它们相加以获得迄今为止的响应状态!

chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4]
AIMessageChunk(content=' Hello! My name is')

Chains

实际上,所有 LLM 申请都涉及更多步骤,而不仅仅是调用语言模型。

让我们使用 LangChain 表达式语言(LCEL)构建一个简单的链,它结合了提示、模型和解析器,并验证流式传输是否有效。

我们将使用 StrOutputParser 来解析模型的输出。这是一个简单的解析器,它从 AIMessageChunk 中提取内容字段,为我们提供模型返回的Token。

::: tip 提示 LCEL 是一种通过将不同的 LangChain 原语链接在一起来指定“程序”的声明方式。使用 LCEL 创建的链受益于 Stream 和 astream 的自动实现,允许流式传输最终输出。事实上,用 LCEL 创建的链实现了整个标准 Runnable 接口。 :::

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}")
parser = StrOutputParser()
chain = prompt | model | parser

async for chunk in chain.astream({"topic": "parrot"}):
print(chunk, end="|", flush=True)
 Here|'s| a| silly| joke| about| a| par|rot|:|

What| kind| of| teacher| gives| good| advice|?| An| ap|-|parent| (|app|arent|)| one|!||

::: info 注意 您不必使用 LangChain 表达式语言来使用 LangChain,而是可以依靠标准的命令式编程方法,通过单独调用每个组件上的调用、批处理或流,将结果分配给变量,然后在您认为合适的情况下在下游使用它们。

如果这满足您的需求,那么我们没问题👌! :::

使用输入流

如果您想在生成 JSON 时从输出中流式传输该怎么办?

如果您依赖 json.loads 来解析部分 json,解析将会失败,因为部分 json 不是有效的 json。

您可能会完全不知所措,并声称无法传输 JSON。

好吧,事实证明有一种方法可以做到这一点——解析器需要对输入流进行操作,并尝试将部分 json“自动完成”为有效状态。

让我们看看这样一个解析器的运行情况,以了解这意味着什么。

from langchain_core.output_parsers import JsonOutputParser

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models
async for text in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
print(text, flush=True)
{}
{'countries': []}
{'countries': [{}]}
{'countries': [{'name': ''}]}
{'countries': [{'name': 'France'}]}
{'countries': [{'name': 'France', 'population': 67}]}
{'countries': [{'name': 'France', 'population': 6739}]}
{'countries': [{'name': 'France', 'population': 673915}]}
{'countries': [{'name': 'France', 'population': 67391582}]}
{'countries': [{'name': 'France', 'population': 67391582}, {}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Sp'}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain'}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 4675}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 467547}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': ''}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan'}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 12}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 12647}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 1264764}]}
{'countries': [{'name': 'France', 'population': 67391582}, {'name': 'Spain', 'population': 46754778}, {'name': 'Japan', 'population': 126476461}]}

现在,让我们中断流式传输。我们将使用前面的示例,并在末尾附加一个提取函数,从最终的 JSON 中提取国家/地区名称。

::: warning 风险 链中对最终输入而不是对输入流进行操作的任何步骤都可能会通过 Stream 或 astream 破坏流功能。 :::

::: tip 提示 稍后,我们将讨论 astream_events API,它流式传输中间步骤的结果。即使链包含仅对最终输入进行操作的步骤,此 API 也会流式传输中间步骤的结果。 :::

from langchain_core.output_parsers import (
JsonOutputParser,
)


# A function that operates on finalized inputs
# rather than on an input_stream
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = model | JsonOutputParser() | _extract_country_names

async for text in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
print(text, end="|", flush=True)
['France', 'Spain', 'Japan']|

生成器函数

让我们使用可以在输入流上运行的生成器函数来修复流。

::: tip 提示 生成器函数(使用yield的函数)允许编写对输入流进行操作的代码 :::

from langchain_core.output_parsers import JsonOutputParser


async def _extract_country_names_streaming(input_stream):
"""A function that operates on input streams."""
country_names_so_far = set()

async for input in input_stream:
if not isinstance(input, dict):
continue

if "countries" not in input:
continue

countries = input["countries"]

if not isinstance(countries, list):
continue

for country in countries:
name = country.get("name")
if not name:
continue
if name not in country_names_so_far:
yield name
country_names_so_far.add(name)


chain = model | JsonOutputParser() | _extract_country_names_streaming

async for text in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'
):
print(text, end="|", flush=True)

France|Sp|Spain|Japan|

::: info 注意 由于上面的代码依赖于 JSON 自动完成,因此您可能会看到国家/地区的部分名称(例如 Sp 和 Spain),这不是人们想要的提取结果!

我们关注的是流媒体概念,而不一定是链条的结果。 :::

非流式组件

一些内置组件(例如 Retrievers)不提供任何流式传输。如果我们尝试流式传输它们会发生什么?

from langchain_community.vectorstores import FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import OpenAIEmbeddings

template = """Answer the question based only on the following context:
{context}

Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)

vectorstore = FAISS.from_texts(
["harrison worked at kensho", "harrison likes spicy food"],
embedding=OpenAIEmbeddings(),
)
retriever = vectorstore.as_retriever()

chunks = [chunk for chunk in retriever.stream("where did harrison work?")]
chunks
[[Document(page_content='harrison worked at kensho'),
Document(page_content='harrison likes spicy food')]]

Stream 刚刚从该组件产生了最终结果。

当然,不是所有的组件都需要实现流式,一些情况下既没必要,麻烦又没什么意义。

::: tip 提示 使用非流式组件构建的 LCEL 链在很多情况下仍然能够进行流式传输,部分输出的流式传输在链中最后一个非流式步骤之后开始。 :::

retrieval_chain = (
{
"context": retriever.with_config(run_name="Docs"),
"question": RunnablePassthrough(),
}
| prompt
| model
| StrOutputParser()
)
for chunk in retrieval_chain.stream(
"Where did harrison work? " "Write 3 made up sentences about this place."
):
print(chunk, end="|", flush=True)
 Based| on| the| given| context|,| the| only| information| provided| about| where| Harrison| worked| is| that| he| worked| at| Ken|sh|o|.| Since| there| are| no| other| details| provided| about| Ken|sh|o|,| I| do| not| have| enough| information| to| write| 3| additional| made| up| sentences| about| this| place|.| I| can| only| state| that| Harrison| worked| at| Ken|sh|o|.||

现在我们已经了解了 Stream 和 astream 的工作原理,接下来让我们进入流事件的世界。

使用流事件

事件流是一个测试版 API。此 API 可能会根据反馈进行一些更改

::: info 注释 在 langchain-core 0.1.14 中引入。 :::

import langchain_core

langchain_core.__version__
'0.1.18'

为了使 astream_events API 正常工作:

  • 尽可能在整个代码中使用异步(例如,异步工具等)
  • 如果定义自定义functions / runnables,则传播回调
  • 每当使用不带 LCEL 的可运行对象时,请确保在 LLM 上调用 .astream() 而不是 .ainvoke 以强制 LLM 流式传输令牌。
  • 如果有任何事情没有按预期工作,请告诉我们!

事件参考

下面是一个参考表,显示了各种 Runnable 对象可能发出的一些事件。

::: info 注释 当流正确实现时,直到输入流完全被消耗之后,才知道可运行的输入。这意味着通常仅包含结束事件的输入,而不包含开始事件的输入。 :::

聊天模型

让我们首先查看聊天模型生成的事件。

events = []
async for event in model.astream_events("hello", version="v1"):
events.append(event)
/home/eugene/src/langchain/libs/core/langchain_core/_api/beta_decorator.py:86: LangChainBetaWarning: This API is in beta and may change in the future.
warn_beta(

::: info 注释 嘿 API 中那个有趣的 version=“v1” 参数是什么?!

这是一个测试版 API,我们几乎肯定会对其进行一些更改。

此版本参数将使我们能够最大程度地减少对代码的此类破坏性更改。

总之,我们现在烦你,以后就不用烦你了。 :::

让我们看一下一些开始事件和一些结束事件。

events[:3]
[{'event': 'on_chat_model_start',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'name': 'ChatAnthropic',
'tags': [],
'metadata': {},
'data': {'input': 'hello'}},
{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content=' Hello')}},
{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content='!')}}]
events[-2:]
[{'event': 'on_chat_model_stream',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'name': 'ChatAnthropic',
'data': {'chunk': AIMessageChunk(content='')}},
{'event': 'on_chat_model_end',
'name': 'ChatAnthropic',
'run_id': '555843ed-3d24-4774-af25-fbf030d5e8c4',
'tags': [],
'metadata': {},
'data': {'output': AIMessageChunk(content=' Hello!')}}]

让我们回顾一下解析流式 JSON 的示例链,以探索流式事件 API。

chain = (
model | JsonOutputParser()
) # Due to a bug in older versions of Langchain, JsonOutputParser did not stream results from some models

events = [
event
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
)
]

如果您检查前几个事件,您会发现有 3 个不同的开始事件,而不是 2 个开始事件。

三个启动事件对应:

  1. 链(模型+解析器)
  2. 模型
  3. 解析器
events[:3]
[{'event': 'on_chain_start',
'run_id': 'b1074bff-2a17-458b-9e7b-625211710df4',
'name': 'RunnableSequence',
'tags': [],
'metadata': {},
'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}},
{'event': 'on_chat_model_start',
'name': 'ChatAnthropic',
'run_id': '6072be59-1f43-4f1c-9470-3b92e8406a99',
'tags': ['seq:step:1'],
'metadata': {},
'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}},
{'event': 'on_parser_start',
'name': 'JsonOutputParser',
'run_id': 'bf978194-0eda-4494-ad15-3a5bfe69cd59',
'tags': ['seq:step:2'],
'metadata': {},
'data': {}}]

如果您查看最近 3 个事件,您认为您会看到什么?中间呢?

让我们使用此 API 从模型和解析器获取流事件的输出。我们忽略了开始事件、结束事件和链中的事件。

num_events = 0

async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
...

由于模型和解析器都支持流式传输,因此我们可以实时看到来自两个组件的流式传输事件!是不是很酷?

过滤事件

由于此 API 产生如此多的事件,因此能够过滤事件非常有用。

您可以按组件name、组件tag或组件type进行过滤。

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
include_names=["my_parser"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_parser_start', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': []}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': ''}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France'}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 6739}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 673915}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}]}}}
{'event': 'on_parser_stream', 'name': 'my_parser', 'run_id': 'f2ac1d1c-e14a-45fc-8990-e5c24e707299', 'tags': ['seq:step:2'], 'metadata': {}, 'data': {'chunk': {'countries': [{'name': 'France', 'population': 67391582}, {}]}}}
...

按类型

chain = model.with_config({"run_name": "model"}) | JsonOutputParser().with_config(
{"run_name": "my_parser"}
)

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
include_types=["chat_model"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chat_model_start', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' Here')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' is')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' JSON')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' with')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' requested')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' countries')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' and')}}
{'event': 'on_chat_model_stream', 'name': 'model', 'run_id': '98a6e192-8159-460c-ba73-6dfc921e3777', 'tags': ['seq:step:1'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' their')}}
...

按标签Tags

::: warning 警惕 标签由给定可运行对象的子组件继承。

如果您使用标签进行过滤,请确保这是您想要的。 :::

chain = (model | JsonOutputParser()).with_config({"tags": ["my_chain"]})

max_events = 0
async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
include_tags=["my_chain"],
):
print(event)
max_events += 1
if max_events > 10:
# Truncate output
print("...")
break
{'event': 'on_chain_start', 'run_id': '190875f3-3fb7-49ad-9b6e-f49da22f3e49', 'name': 'RunnableSequence', 'tags': ['my_chain'], 'metadata': {}, 'data': {'input': 'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`'}}
{'event': 'on_chat_model_start', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'input': {'messages': [[HumanMessage(content='output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`')]]}}}
{'event': 'on_parser_start', 'name': 'JsonOutputParser', 'run_id': '3b5e4ca1-40fe-4a02-9a19-ba2a43a6115c', 'tags': ['seq:step:2', 'my_chain'], 'metadata': {}, 'data': {}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' Here')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' is')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' JSON')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' with')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' the')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' requested')}}
{'event': 'on_chat_model_stream', 'name': 'ChatAnthropic', 'run_id': 'ff58f732-b494-4ff9-852a-783d42f4455d', 'tags': ['seq:step:1', 'my_chain'], 'metadata': {}, 'data': {'chunk': AIMessageChunk(content=' countries')}}
...

非流式组件

还记得某些组件由于不对输入流进行操作而无法很好地进行流传输吗?

虽然在使用 astream 时此类组件可能会中断最终输出的流式传输,但 astream_events 仍将从支持流式传输的中间步骤产生流式事件!

# Function that does not support streaming.
# It operates on the finalizes inputs rather than
# operating on the input stream.
def _extract_country_names(inputs):
"""A function that does not operates on input streams and breaks streaming."""
if not isinstance(inputs, dict):
return ""

if "countries" not in inputs:
return ""

countries = inputs["countries"]

if not isinstance(countries, list):
return ""

country_names = [
country.get("name") for country in countries if isinstance(country, dict)
]
return country_names


chain = (
model | JsonOutputParser() | _extract_country_names
) # This parser only works with OpenAI right now

正如预期的那样,astream API 无法正常工作,因为 _extract_country_names 不适用于流。

async for chunk in chain.astream(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
):
print(chunk, flush=True)
['France', 'Spain', 'Japan']

现在,让我们确认通过 astream_events 我们仍然可以看到来自模型和解析器的流输出。

num_events = 0

async for event in chain.astream_events(
'output a list of the countries france, spain and japan and their populations in JSON format. Use a dict with an outer key of "countries" which contains a list of countries. Each country should have the key `name` and `population`',
version="v1",
):
kind = event["event"]
if kind == "on_chat_model_stream":
print(
f"Chat model chunk: {repr(event['data']['chunk'].content)}",
flush=True,
)
if kind == "on_parser_stream":
print(f"Parser chunk: {event['data']['chunk']}", flush=True)
num_events += 1
if num_events > 30:
# Truncate the output
print("...")
break
Chat model chunk: ' Here'
Chat model chunk: ' is'
Chat model chunk: ' the'
Chat model chunk: ' JSON'
Chat model chunk: ' with'
Chat model chunk: ' the'
Chat model chunk: ' requested'
Chat model chunk: ' countries'
Chat model chunk: ' and'
Chat model chunk: ' their'
Chat model chunk: ' populations'
Chat model chunk: ':'
Chat model chunk: '\n\n```'
Chat model chunk: 'json'
Parser chunk: {}
Chat model chunk: '\n{'
Chat model chunk: '\n '
Chat model chunk: ' "'
Chat model chunk: 'countries'
Chat model chunk: '":'
Parser chunk: {'countries': []}
Chat model chunk: ' ['
Chat model chunk: '\n '
Parser chunk: {'countries': [{}]}
Chat model chunk: ' {'
Chat model chunk: '\n '
Chat model chunk: ' "'
...

传播回调

::: warning 警惕 如果您在工具中使用调用可运行对象,则需要将回调传播到可运行对象;否则,不会生成任何流事件。 :::

::: info 注释 当使用 RunnableLambdas 或 @chain 装饰器时,回调会在幕后自动传播。 :::

from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool


def reverse_word(word: str):
return word[::-1]


reverse_word = RunnableLambda(reverse_word)


@tool
def bad_tool(word: str):
"""Custom tool that doesn't propagate callbacks."""
return reverse_word.invoke(word)


async for event in bad_tool.astream_events("hello", version="v1"):
print(event)
{'event': 'on_tool_start', 'run_id': 'ae7690f8-ebc9-4886-9bbe-cb336ff274f2', 'name': 'bad_tool', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_tool_stream', 'run_id': 'ae7690f8-ebc9-4886-9bbe-cb336ff274f2', 'tags': [], 'metadata': {}, 'name': 'bad_tool', 'data': {'chunk': 'olleh'}}
{'event': 'on_tool_end', 'name': 'bad_tool', 'run_id': 'ae7690f8-ebc9-4886-9bbe-cb336ff274f2', 'tags': [], 'metadata': {}, 'data': {'output': 'olleh'}}

这是正确传播回调的重新实现。您会注意到,现在我们也从reverse_word runnable获取事件。

@tool
def correct_tool(word: str, callbacks):
"""A tool that correctly propagates callbacks."""
return reverse_word.invoke(word, {"callbacks": callbacks})


async for event in correct_tool.astream_events("hello", version="v1"):
print(event)
{'event': 'on_tool_start', 'run_id': '384f1710-612e-4022-a6d4-8a7bb0cc757e', 'name': 'correct_tool', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_chain_start', 'name': 'reverse_word', 'run_id': 'c4882303-8867-4dff-b031-7d9499b39dda', 'tags': [], 'metadata': {}, 'data': {'input': 'hello'}}
{'event': 'on_chain_end', 'name': 'reverse_word', 'run_id': 'c4882303-8867-4dff-b031-7d9499b39dda', 'tags': [], 'metadata': {}, 'data': {'input': 'hello', 'output': 'olleh'}}
{'event': 'on_tool_stream', 'run_id': '384f1710-612e-4022-a6d4-8a7bb0cc757e', 'tags': [], 'metadata': {}, 'name': 'correct_tool', 'data': {'chunk': 'olleh'}}
{'event': 'on_tool_end', 'name': 'correct_tool', 'run_id': '384f1710-612e-4022-a6d4-8a7bb0cc757e', 'tags': [], 'metadata': {}, 'data': {'output': 'olleh'}}

如果您从 Runnable Lambda 或 @chains 中调用 Runnable,则回调将代表您自动传递。

from langchain_core.runnables import RunnableLambda


async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


reverse_and_double = RunnableLambda(reverse_and_double)

await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v1"):
print(event)
{'event': 'on_chain_start', 'run_id': '4fe56c7b-6982-4999-a42d-79ba56151176', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_start', 'name': 'reverse_word', 'run_id': '335fe781-8944-4464-8d2e-81f61d1f85f5', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_end', 'name': 'reverse_word', 'run_id': '335fe781-8944-4464-8d2e-81f61d1f85f5', 'tags': [], 'metadata': {}, 'data': {'input': '1234', 'output': '4321'}}
{'event': 'on_chain_stream', 'run_id': '4fe56c7b-6982-4999-a42d-79ba56151176', 'tags': [], 'metadata': {}, 'name': 'reverse_and_double', 'data': {'chunk': '43214321'}}
{'event': 'on_chain_end', 'name': 'reverse_and_double', 'run_id': '4fe56c7b-6982-4999-a42d-79ba56151176', 'tags': [], 'metadata': {}, 'data': {'output': '43214321'}}

并使用@chain装饰器:

from langchain_core.runnables import chain


@chain
async def reverse_and_double(word: str):
return await reverse_word.ainvoke(word) * 2


await reverse_and_double.ainvoke("1234")

async for event in reverse_and_double.astream_events("1234", version="v1"):
print(event)
{'event': 'on_chain_start', 'run_id': '7485eedb-1854-429c-a2f8-03d01452daef', 'name': 'reverse_and_double', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_start', 'name': 'reverse_word', 'run_id': 'e7cddab2-9b95-4e80-abaf-4b2429117835', 'tags': [], 'metadata': {}, 'data': {'input': '1234'}}
{'event': 'on_chain_end', 'name': 'reverse_word', 'run_id': 'e7cddab2-9b95-4e80-abaf-4b2429117835', 'tags': [], 'metadata': {}, 'data': {'input': '1234', 'output': '4321'}}
{'event': 'on_chain_stream', 'run_id': '7485eedb-1854-429c-a2f8-03d01452daef', 'tags': [], 'metadata': {}, 'name': 'reverse_and_double', 'data': {'chunk': '43214321'}}
{'event': 'on_chain_end', 'name': 'reverse_and_double', 'run_id': '7485eedb-1854-429c-a2f8-03d01452daef', 'tags': [], 'metadata': {}, 'data': {'output': '43214321'}}