跳到主要内容

why-use-LCEL


outline: deep

为什么用 LCEL

我们建议首先阅读 LCEL 入门部分。

LCEL 可以轻松地从基本组件构建复杂的链。它通过提供以下功能来实现此目的:

  1. 统一的接口:每个 LCEL 对象都实现 Runnable 接口,该接口定义一组通用的调用方法(invoke、batch、stream、ainvoke 等)。这使得 LCEL 对象链也可以自动支持这些调用。也就是说,每个 LCEL 对象链本身就是一个 LCEL 对象。
  2. 组合原语:LCEL 提供了许多原语,可以轻松组合链、并行化组件、添加后备、动态配置链内部等。

为了更好地理解 LCEL 的价值,了解它的实际应用并思考如何在没有它的情况下重新创建类似的功能会很有帮助。在本演练中,我们将使用入门部分中的基本示例来实现这一点。我们将采用简单的提示+模型链(它在幕后已经定义了很多功能),并看看如何重新创建所有这些功能。

%pip install –upgrade –quiet langchain-core langchain-openai langchain-anthropic

调用

在最简单的情况下,我们只想传入一个主题字符串并返回一个笑话字符串:

from typing import List

import openai


prompt_template = "Tell me a short joke about {topic}"
client = openai.OpenAI()

def call_chat_model(messages: List[dict]) -> str:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
)
return response.choices[0].message.content

def invoke_chain(topic: str) -> str:
prompt_value = prompt_template.format(topic=topic)
messages = [{"role": "user", "content": prompt_value}]
return call_chat_model(messages)

invoke_chain("ice cream")

如果我们想流式传输结果,我们需要更改我们的函数:

from typing import Iterator


def stream_chat_model(messages: List[dict]) -> Iterator[str]:
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
stream=True,
)
for response in stream:
content = response.choices[0].delta.content
if content is not None:
yield content

def stream_chain(topic: str) -> Iterator[str]:
prompt_value = prompt.format(topic=topic)
return stream_chat_model([{"role": "user", "content": prompt_value}])


for chunk in stream_chain("ice cream"):
print(chunk, end="", flush=True)

批处理

如果我们想并行运行一批输入,我们将再次需要一个新函数:

from concurrent.futures import ThreadPoolExecutor


def batch_chain(topics: list) -> list:
with ThreadPoolExecutor(max_workers=5) as executor:
return list(executor.map(invoke_chain, topics))

batch_chain(["ice cream", "spaghetti", "dumplings"])

异步

如果我们需要异步版本

async_client = openai.AsyncOpenAI()

async def acall_chat_model(messages: List[dict]) -> str:
response = await async_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
)
return response.choices[0].message.content

async def ainvoke_chain(topic: str) -> str:
prompt_value = prompt_template.format(topic=topic)
messages = [{"role": "user", "content": prompt_value}]
return await acall_chat_model(messages)

await ainvoke_chain("ice cream")

LLM代替聊天模式

如果我们想使用完成端点而不是聊天端点:

def call_llm(prompt_value: str) -> str:
response = client.completions.create(
model="gpt-3.5-turbo-instruct",
prompt=prompt_value,
)
return response.choices[0].text

def invoke_llm_chain(topic: str) -> str:
prompt_value = prompt_template.format(topic=topic)
return call_llm(prompt_value)

invoke_llm_chain("ice cream")

不同型号提供商

如果我们想使用 Anthropic 而不是 OpenAI:

import anthropic

anthropic_template = f"Human:\n\n{prompt_template}\n\nAssistant:"
anthropic_client = anthropic.Anthropic()

def call_anthropic(prompt_value: str) -> str:
response = anthropic_client.completions.create(
model="claude-2",
prompt=prompt_value,
max_tokens_to_sample=256,
)
return response.completion

def invoke_anthropic_chain(topic: str) -> str:
prompt_value = anthropic_template.format(topic=topic)
return call_anthropic(prompt_value)

invoke_anthropic_chain("ice cream")

运行时可配置性

如果我们想在运行时选择聊天模型或 LLM 可配置:

def invoke_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> str:
if model == "chat_openai":
return invoke_chain(topic)
elif model == "openai":
return invoke_llm_chain(topic)
elif model == "anthropic":
return invoke_anthropic_chain(topic)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)

def stream_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> Iterator[str]:
if model == "chat_openai":
return stream_chain(topic)
elif model == "openai":
# Note we haven't implemented this yet.
return stream_llm_chain(topic)
elif model == "anthropic":
# Note we haven't implemented this yet
return stream_anthropic_chain(topic)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)

def batch_configurable_chain(
topics: List[str],
*,
model: str = "chat_openai"
) -> List[str]:
# You get the idea
...

async def abatch_configurable_chain(
topics: List[str],
*,
model: str = "chat_openai"
) -> List[str]:
...

invoke_configurable_chain("ice cream", model="openai")
stream = stream_configurable_chain(
"ice_cream",
model="anthropic"
)
for chunk in stream:
print(chunk, end="", flush=True)

# batch_configurable_chain(["ice cream", "spaghetti", "dumplings"])
# await ainvoke_configurable_chain("ice cream")

日志

如果我们想记录中间结果

# 为了说明目的,我们将打印中间步骤
def invoke_anthropic_chain_with_logging(topic: str) -> str:
print(f"Input: {topic}")
prompt_value = anthropic_template.format(topic=topic)
print(f"Formatted prompt: {prompt_value}")
output = call_anthropic(prompt_value)
print(f"Output: {output}")
return output

invoke_anthropic_chain_with_logging("ice cream")

后备措施

如果我们想添加后备逻辑,以防某个模型 API 出现故障:

def invoke_chain_with_fallback(topic: str) -> str:
try:
return invoke_chain(topic)
except Exception:
return invoke_anthropic_chain(topic)

async def ainvoke_chain_with_fallback(topic: str) -> str:
try:
return await ainvoke_chain(topic)
except Exception:
# Note: we haven't actually implemented this.
return ainvoke_anthropic_chain(topic)

async def batch_chain_with_fallback(topics: List[str]) -> str:
try:
return batch_chain(topics)
except Exception:
# Note: we haven't actually implemented this.
return batch_anthropic_chain(topics)

invoke_chain_with_fallback("ice cream")
# await ainvoke_chain_with_fallback("ice cream")
batch_chain_with_fallback(["ice cream", "spaghetti", "dumplings"]))

完整代码对比

即使在这种简单的情况下,我们的 LCEL 链也简洁地包含了许多功能。随着链条变得更加复杂,这变得尤其有价值。

from concurrent.futures import ThreadPoolExecutor
from typing import Iterator, List, Tuple

import anthropic
import openai


prompt_template = "Tell me a short joke about {topic}"
anthropic_template = f"Human:\n\n{prompt_template}\n\nAssistant:"
client = openai.OpenAI()
async_client = openai.AsyncOpenAI()
anthropic_client = anthropic.Anthropic()

def call_chat_model(messages: List[dict]) -> str:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
)
return response.choices[0].message.content

def invoke_chain(topic: str) -> str:
print(f"Input: {topic}")
prompt_value = prompt_template.format(topic=topic)
print(f"Formatted prompt: {prompt_value}")
messages = [{"role": "user", "content": prompt_value}]
output = call_chat_model(messages)
print(f"Output: {output}")
return output

def stream_chat_model(messages: List[dict]) -> Iterator[str]:
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
stream=True,
)
for response in stream:
content = response.choices[0].delta.content
if content is not None:
yield content

def stream_chain(topic: str) -> Iterator[str]:
print(f"Input: {topic}")
prompt_value = prompt.format(topic=topic)
print(f"Formatted prompt: {prompt_value}")
stream = stream_chat_model([{"role": "user", "content": prompt_value}])
for chunk in stream:
print(f"Token: {chunk}", end="")
yield chunk

def batch_chain(topics: list) -> list:
with ThreadPoolExecutor(max_workers=5) as executor:
return list(executor.map(invoke_chain, topics))

def call_llm(prompt_value: str) -> str:
response = client.completions.create(
model="gpt-3.5-turbo-instruct",
prompt=prompt_value,
)
return response.choices[0].text

def invoke_llm_chain(topic: str) -> str:
print(f"Input: {topic}")
prompt_value = promtp_template.format(topic=topic)
print(f"Formatted prompt: {prompt_value}")
output = call_llm(prompt_value)
print(f"Output: {output}")
return output

def call_anthropic(prompt_value: str) -> str:
response = anthropic_client.completions.create(
model="claude-2",
prompt=prompt_value,
max_tokens_to_sample=256,
)
return response.completion

def invoke_anthropic_chain(topic: str) -> str:
print(f"Input: {topic}")
prompt_value = anthropic_template.format(topic=topic)
print(f"Formatted prompt: {prompt_value}")
output = call_anthropic(prompt_value)
print(f"Output: {output}")
return output

async def ainvoke_anthropic_chain(topic: str) -> str:
...

def stream_anthropic_chain(topic: str) -> Iterator[str]:
...

def batch_anthropic_chain(topics: List[str]) -> List[str]:
...

def invoke_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> str:
if model == "chat_openai":
return invoke_chain(topic)
elif model == "openai":
return invoke_llm_chain(topic)
elif model == "anthropic":
return invoke_anthropic_chain(topic)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)

def stream_configurable_chain(
topic: str,
*,
model: str = "chat_openai"
) -> Iterator[str]:
if model == "chat_openai":
return stream_chain(topic)
elif model == "openai":
# Note we haven't implemented this yet.
return stream_llm_chain(topic)
elif model == "anthropic":
# Note we haven't implemented this yet
return stream_anthropic_chain(topic)
else:
raise ValueError(
f"Received invalid model '{model}'."
" Expected one of chat_openai, openai, anthropic"
)

def batch_configurable_chain(
topics: List[str],
*,
model: str = "chat_openai"
) -> List[str]:
...

async def abatch_configurable_chain(
topics: List[str],
*,
model: str = "chat_openai"
) -> List[str]:
...

def invoke_chain_with_fallback(topic: str) -> str:
try:
return invoke_chain(topic)
except Exception:
return invoke_anthropic_chain(topic)

async def ainvoke_chain_with_fallback(topic: str) -> str:
try:
return await ainvoke_chain(topic)
except Exception:
return ainvoke_anthropic_chain(topic)

async def batch_chain_with_fallback(topics: List[str]) -> str:
try:
return batch_chain(topics)
except Exception:
return batch_anthropic_chain(topics)

下一步

要继续了解 LCEL,我们建议: - 阅读完整的 LCEL 界面,我们在此仅部分介绍。 - 探索“操作方法”部分,了解 LCEL 提供的其他组合原语。 - 浏览说明书部分,了解 LCEL 在常见用例中的实际应用。下一个值得关注的用例是检索增强生成。