1-Introduction
from typing_extensions import TypedDict
class State(TypedDict):
graph_state: strdef node_1(state):
print("---Node 1---")
return {"graph_state": state['graph_state'] +" I am"}
def node_2(state):
print("---Node 2---")
return {"graph_state": state['graph_state'] +" happy!"}
def node_3(state):
print("---Node 3---")
return {"graph_state": state['graph_state'] +" sad!"}LangGraph 采用了函数式编程的范式:
- 不可变状态:
- 在 LangGraph 中,节点函数不会直接修改传入的状态对象
- 每个节点接收当前状态,然后返回一个新的状态字典,而不是修改原始状态
- 纯函数:
- 示例中的节点函数 node_1、node_2、node_3 都是纯函数
- 它们仅依赖于输入参数(状态),并返回新的状态值,没有副作用
- 相同的输入总是产生相同的输出
- 状态转换而非状态修改:
- 这里没有修改原始 state 对象,而是返回一个包含新 graph_state 值的新字典
- 数据流:
- 整个图的执行是数据流动的过程
- 数据(状态)从一个节点流向下一个节点,每个节点产生新的状态
- 声明式而非命令式:
- 图的构建过程是声明式的,指定”什么”而不是”如何”
- 我们声明节点和边缘的关系,而不是命令式地指定执行步骤
1-1 Chain
DEMO 中:
- 使用
ChatMessages作为图的状态 ; - 在图的 节点中使用聊天模型 ;
- 将
Tools绑定到聊天模型 ; - 在图 节点中执行 工具调用,
Tool Calls;
1.Messages:
在
LangChain中, 消息是捕获对话中不同角色的对象
HumanMessage: 用户消息AIMessage:AI模型生成的消息SystemMessage: 给模型的指令消息ToolMessage: 工具调用的消息
每个消息可以包含:
content: 内容name: 可选的作者response_metadata: 可选的, 一个字典,用来存放元数据, 一般AiMessage中会用来说明Model Provider
from pprint import pprint
from langchain_core.messages import AIMessage, HumanMessage
messages = [AIMessage(content=f"So you said you were researching ocean mammals?", name="Model")]
messages.append(HumanMessage(content=f"Yes, that's right.",name="Lance"))
messages.append(AIMessage(content=f"Great, what would you like to learn about.", name="Model"))
messages.append(HumanMessage(content=f"I want to learn about the best place to see Orcas in the US.", name="Lance"))
for m in messages:
m.pretty_print()2.Tools
工具可以让 模型和外部进行交互
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
llm_with_tools = llm.bind_tools([multiply])可以查看某个 llm 的 tool_calls
tool_call.tool_calls3.Reducers
默认情况下,节点返回的状态会覆盖旧状态。但对于消息列表,我们希望新消息被添加到现有列表中,而不是替换它
这个会覆盖掉
from typing_extensions import TypedDict
from langchain_core.messages import AnyMessage
class MessagesState(TypedDict):
messages: list[AnyMessage]使用 reducer
from typing import Annotated
from langgraph.graph.message import add_messages
class MessagesState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]1-2 Router
tool_condition 就是一种 router
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
from langgraph.graph import MessagesState
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition
# Node
def tool_calling_llm(state: MessagesState):
return {"messages": [llm_with_tools.invoke(state["messages"])]}
# Build graph
builder = StateGraph(MessagesState)
builder.add_node("tool_calling_llm", tool_calling_llm)
builder.add_node("tools", ToolNode([multiply]))
builder.add_edge(START, "tool_calling_llm")
builder.add_conditional_edges(
"tool_calling_llm",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", END)
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))可以更强大一些.
def multiply(a: int, b: int) -> int:
"""Multiply a and b.
Args:
a: first int
b: second int
"""
return a * b
# This will be a tool
def add(a: int, b: int) -> int:
"""Adds a and b.
Args:
a: first int
b: second int
"""
return a + b
def divide(a: int, b: int) -> float:
"""Divide a and b.
Args:
a: first int
b: second int
"""
return a / b
tools = [add, multiply, divide]
llm = ChatOpenAI(model="gpt-4o")from langgraph.graph import MessagesState
from langchain_core.messages import HumanMessage, SystemMessage
# System message
sys_msg = SystemMessage(content="You are a helpful assistant tasked with performing arithmetic on a set of inputs.")
# Node
def assistant(state: MessagesState):
return {"messages": [llm_with_tools.invoke([sys_msg] + state["messages"])]}from langgraph.graph import START, StateGraph
from langgraph.prebuilt import tools_condition
from langgraph.prebuilt import ToolNode
from IPython.display import Image, display
# Graph
builder = StateGraph(MessagesState)
# Define nodes: these do the work
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(tools))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "assistant")
react_graph = builder.compile()
# Show
display(Image(react_graph.get_graph(xray=True).draw_mermaid_png()))2-State And Memory
2-1 Multiple Schema
通常来说, 一个 graph 中所有的 nodes 都通过一个单一的 schema 进行通信.
但是有一些特殊的场景下, 刻意希望灵活一些.
- 内部的部分
Node需要传递不需要出现在Graph这个层面的Input和Output. - 我们希望为
Graph使用不同的Input/Output, 比如说Output中仅仅只包含了一个相关的键 .
1)-例子1 node_2 需要一个私有的状态
from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
class OverallState(TypedDict):
foo: int
class PrivateState(TypedDict):
baz: int
def node_1(state: OverallState) -> PrivateState:
print("---Node 1---")
return {"baz": state['foo'] + 1}
def node_2(state: PrivateState) -> OverallState:
print("---Node 2---")
return {"foo": state['baz'] + 1}
# Build graph
builder = StateGraph(OverallState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)
# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)
# Add
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))2)-例子2: Input|output 需要不一样.
class InputState(TypedDict):
question: str
class OutputState(TypedDict):
answer: str
class OverallState(TypedDict):
question: str
answer: str
notes: str
def thinking_node(state: InputState):
return {"answer": "bye", "notes": "... his is name is Lance"}
def answer_node(state: OverallState) -> OutputState:
return {"answer": "bye Lance"}
graph = StateGraph(OverallState, input=InputState, output=OutputState)
graph.add_node("answer_node", answer_node)
graph.add_node("thinking_node", thinking_node)
graph.add_edge(START, "thinking_node")
graph.add_edge("thinking_node", "answer_node")
graph.add_edge("answer_node", END)
graph = graph.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))
graph.invoke({"question":"hi"})- 其中
InputState: 仅仅包含了question字段 - 其中
OutputState: 仅仅包含了answer字段 OverallState则包含了所有.
创建图的时候, 可以使用 type hint 指定这些东西.
graph = StateGraph(OverallState, input=InputState, output=OutputState)2-2 Filtering and trimming messages
随着对话的进行, 消息的历史会不断地增长, 随着对话的进行, 消息历史会不断的增长, 这可能会导致2个主要的问题.
token的使用量增加- 响应的延迟变高
为了解决这2个问题, langGraph 直接提供了几种管理消息历史的技术:
- 消息过滤:
Message Filtering - 消息修剪:
Message Trimming
2-2-1 Message Filtering
1)-例子: 直接加个 node 用来消息修剪
from langchain_core.messages import RemoveMessage
# Nodes
def filter_messages(state: MessagesState):
# Delete all but the 2 most recent messages
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"messages": delete_messages}
def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"])]}
# Build graph
builder = StateGraph(MessagesState)
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "filter")
builder.add_edge("filter", "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))- 这个代码中仅仅保留了最新的2条数据
- 核心是使用了
RemoveMessage和add_messagesreducer
通过如下代码可以测试
# Message list with a preamble
messages = [AIMessage("Hi.", name="Bot", id="1")]
messages.append(HumanMessage("Hi.", name="Lance", id="2"))
messages.append(AIMessage("So you said you were researching ocean mammals?", name="Bot", id="3"))
messages.append(HumanMessage("Yes, I know about whales. But what others should I learn about?", name="Lance", id="4"))
# Invoke
output = graph.invoke({'messages': messages})
for m in output['messages']:
m.pretty_print()2)-例子2: 传递给大模型的时候仅仅给最后1条
# Node
def chat_model_node(state: MessagesState):
return {"messages": [llm.invoke(state["messages"][-1:])]}
# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))仅仅给了最后1条, 这样还保存了完整的消息历史, 也没有修改图的状态
2-2-2 Message Trims
from langchain_core.messages import trim_messages
# Node
def chat_model_node(state: MessagesState):
messages = trim_messages(
state["messages"],
max_tokens=100,
strategy="last",
token_counter=ChatOpenAI(model="gpt-4o"),
allow_partial=False,
)
return {"messages": [llm.invoke(messages)]}
# Build graph
builder = StateGraph(MessagesState)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "chat_model")
builder.add_edge("chat_model", END)
graph = builder.compile()
# View
display(Image(graph.get_graph().draw_mermaid_png()))****max_tokens: 允许的最大tokens数目 ;strategy: 采取last策略, 表示从最后1条消息开始保留, 直到达到token的限制 ;token_counter: 用于计算token数量的工具 ;allow_partial: 是否允许部分消息, 设为False表示已经包含了完整的消息列表 ;
可以直接用下面的代码进行测试:
# Example of trimming messages
trim_messages(
messages,
max_tokens=100,
strategy="last",
token_counter=ChatOpenAI(model="gpt-4o"),
allow_partial=False
)2-3 Chatbot with message summarization
之前的方案中, 在处理长对话的时候会面临2个主要问题:
- 随着对话长度增加,向LLM发送的上下文变得越来越长,导致Token使用量增加和延迟增加
- 没有有效的手段去 保留长期对话的 上下文信息
message summarization 不同于上面的 filtering 或者说 triming, 则是另外一个手段来保留 上下文信息.
但是成本更高. 后期可以离线设计.
- 使用
LLM产生对话的消息摘要, (running summary) ; - 保留压缩版的完整对话, 而不是简单的删除历史消息 ;
- 新消息来的时候会扩展现有的摘要 ;
1)-增加摘要字段
from langgraph.graph import MessagesState
class State(MessagesState):
summary: str2)-关键节点: call_model
from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage
# Define the logic to call the model
def call_model(state: State):
# Get summary if it exists
summary = state.get("summary", "")
# If there is summary, then we add it
if summary:
# Add summary to system message
system_message = f"Summary of conversation earlier: {summary}"
# Append summary to any newer messages
messages = [SystemMessage(content=system_message)] + state["messages"]
else:
messages = state["messages"]
response = model.invoke(messages)
return {"messages": response}- 检查是否存在摘要
- 如果存在摘要, 将其添加到系统消息中
- 调用 LLM 生成回复
3)-关键节点: summarize_conversation
def summarize_conversation(state: State):
# First, we get any existing summary
summary = state.get("summary", "")
# Create our summarization prompt
if summary:
# A summary already exists
summary_message = (
f"This is summary of the conversation to date: {summary}\n\n"
"Extend the summary by taking into account the new messages above:"
)
else:
summary_message = "Create a summary of the conversation above:"
# Add prompt to our history
messages = state["messages"] + [HumanMessage(content=summary_message)]
response = model.invoke(messages)
# Delete all but the 2 most recent messages
delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
return {"summary": response.content, "messages": delete_messages}- 检查现在的摘要
- 创建适当的摘要提示,要么扩展现在的摘要, 要么创建新的摘要
- 删除除了最近2条消息 外的所有消息, 只保留摘要和最新的消息
4)-关键节点: should_continue
from langgraph.graph import END
# Determine whether to end or summarize the conversation
def should_continue(state: State):
"""Return the next node to execute."""
messages = state["messages"]
# If there are more than six messages, then we summarize the conversation
if len(messages) > 6:
return "summarize_conversation"
# Otherwise we can just end
return END- 决定是继续对话还是进行摘要
- 消息超过6条的时候触发摘要功能.
3-UX and Human-in-the-loop.
3-1 Streaming
pass
3-2 Breakpoints
通过 interupt 可以中断. graph
3-3 Editing State and human Feedback
# 获取当前状态
state = graph.get_state(thread)
# 访问最后一条消息
last_message = current_state['values']['messages'][-1]
# 编辑消息内容
last_message['content'] = "新的消息内容"
# 更新状态
await client.threads.update_state(thread['thread_id'], {"messages": last_message})- 如果提供消息的 id,则会覆盖具有相同 ID 的现有消息
- 如果不提供 ID,则会添加新消息到列表中, 依旧会触发
add_messages
3-4 TimeTravel
时间旅行功能主要包含三个核心能力:
1)-浏览历史状态 (Browsing History)
LangGraph会保存代理执行过程中的所有状态。开发者可以使用get_state方法查看当前状态,或使用get_state_history方法获取所有历史状态。每个状态都有一个唯一的checkpoint ID标识,记录了当时的完整信息,包括:
- 状态值(values):例如消息历史
- 下一步执行节点(next)
- 配置信息(config):包含thread_id和checkpoint_id
- 元数据(metadata):包含执行步骤等信息
2)-重放 (Replaying)
重放功能允许从任何历史状态重新执行代理流程。通过指定checkpoint_id,开发者可以让代理从特定的历史点重新开始执行。这对于重现问题或理解代理的决策过程非常有用。
重放时,系统会认识到这个checkpoint已经被执行过,它会重新执行从这个点开始的所有流程。
例如,如果你有一个状态包含了用户输入”Multiply 2 and 3”,你可以从这个状态重新执行代理,得到相同的结果。
3)-分叉 (Forking)
分叉是时间旅行最强大的特性之一。它允许从任何历史状态创建一个新的执行路径,但可以修改该状态的内容。
例如,你可以从一个包含”Multiply 2 and 3”输入的状态分叉出来,将输入修改为”Multiply 5 and 3”,然后让代理从这个修改后的状态继续执行。这样,你就创建了一个全新的执行路径,但保留了原始状态的其他信息(如下一步要执行哪个节点等)。
分叉时,会创建一个新的checkpoint_id,系统认识到这是一个全新的状态,会从头执行而不是仅重放。
4)-时间旅行的应用场景
时间旅行功能在以下场景中特别有用:
- 调试:当代理出现问题时,可以回到特定状态重新执行,或尝试不同的输入
- 测试:通过从同一起点创建多个分支,测试不同输入对代理行为的影响
- 人机协作:允许人类干预代理流程,修改状态后继续执行
- 优化:分析代理在不同状态下的表现,找出可以改进的地方