Part 1: What(是什么)
1.1 系统定义
Lead Agent 是 DeerFlow 的核心智能体(Agent),负责协调任务执行、工具调用、子代理管理和记忆维护。
核心定位:
- 不是简单的聊天机器人,而是任务编排引擎
- 支持子代理并行执行(DeerFlow 的核心特性)
- 采用中间件架构实现高度可扩展的请求处理流水线
1.2 核心职责
| 职责 | What(是什么) |
|---|---|
| 任务编排 | 分解复杂任务,协调多个子代理并行执行 |
| 工具管理 | 动态加载 MCP、Sandbox、内置工具 |
| 记忆管理 | 长期记忆注入和上下文维护 |
| 澄清机制 | 智能识别歧义,主动请求用户澄清 |
| 状态管理 | 维护线程级隔离的状态(ThreadState) |
1.3 系统架构概览
用户请求
↓
Lead Agent(工厂函数创建)
├─ 模型(动态选择)
├─ 工具集(动态加载)
├─ 中间件链(10个中间件顺序执行)
└─ 系统提示词(动态生成)
↓
响应返回
1.4 关键数据结构
ThreadState(线程级状态):
class ThreadState:
sandbox: SandboxState # 沙盒执行环境
thread_data: ThreadDataState # 线程数据目录
title: str # 对话标题
artifacts: list[str] # 生成文件列表
todos: list # 任务列表(Plan Mode)
uploaded_files: list[dict] # 上传文件
viewed_images: dict # 已查看图片核心特点: 每个 thread_id 拥有独立的隔离状态,实现多租户。
Part 2: Why(为什么这样设计)← 重点
2.1 核心问题:为什么需要 Lead Agent?
简单 Agent 的问题:
用户: "分析一下我们的竞争对手"
简单 Agent: 直接搜索 → 给出结果
问题:
1. 搜索范围太广,不够深入
2. 没有分领域分析(产品、技术、市场)
3. 没有交叉验证信息
4. 单线程执行,效率低
Lead Agent 的解决方案:
用户: "分析一下我们的竞争对手"
Lead Agent:
1. 分解为 3 个子任务(产品、技术、市场)
2. 并行启动 3 个子代理同时分析
3. 收集所有结果,综合对比
4. 生成完整的竞争分析报告
Why Lead Agent 是核心:
- 编排能力: 复杂任务需要分解和协调
- 并行效率: 子代理并行执行,显著提速
- 质量保证: 多维度分析 + 交叉验证
2.2 关键决策 1: 为什么用工厂函数而非类继承?
两种方案的对比:
# ❌ 方案1: 类继承(传统方式)
class LeadAgent(BaseAgent):
def __init__(self):
self.memory = Memory()
self.sandbox = Sandbox()
# 问题: 组件硬编码,运行时无法灵活替换
# ✅ 方案2: 工厂函数(DeerFlow 方式)
def make_lead_agent(config: RunnableConfig):
model = create_chat_model(config.model_name) # 动态选择
tools = get_available_tools(config.tool_groups) # 动态加载
middleware = _build_middlewares(config) # 动态组装
return create_agent(model, tools, middleware, ...)Why 工厂函数更好:
| 维度 | 类继承 | 工厂函数 |
|---|---|---|
| 灵活性 | 低(组件硬编码) | 高(运行时组装) |
| 配置驱动 | 难(需要改代码) | 易(config 决定行为) |
| 测试性 | 差(依赖难以 Mock) | 好(依赖注入) |
| 扩展性 | 继承层次复杂 | 组合优于继承 |
核心洞察:
Agent 的能力应该是配置决定的,而不是代码写死的。
同一个 factory,传入不同配置,可以创建完全不同的 Agent。
权衡: 工厂函数丢失了面向对象的封装性,但获得了更大的灵活性。对于 Agent 这种需要高度定制的场景,灵活性更重要。
2.3 关键决策 2: 为什么用中间件架构?
问题背景: Agent 执行流程中有许多横切关注点:
- 初始化线程目录
- 处理上传文件
- 获取沙盒实例
- 生成对话标题
- 更新长期记忆
- 拦截澄清请求
不用中间件的方案:
def run_agent():
# 1. 初始化目录(耦合在核心逻辑里)
init_thread_dirs()
# 2. 获取沙盒(耦合)
acquire_sandbox()
# 3. 实际业务逻辑
response = model.generate()
# 4. 生成标题(耦合)
generate_title()
# 5. 更新记忆(耦合)
update_memory()
return response问题:
- 核心业务逻辑被淹没
- 新增功能需要修改核心代码
- 功能之间可能冲突
中间件架构的解决方案:
# 每个关注点封装为独立中间件
middlewares = [
ThreadDataMiddleware(), # 只关心目录初始化
SandboxMiddleware(), # 只关心沙盒
TitleMiddleware(), # 只关心标题
MemoryMiddleware(), # 只关心记忆
ClarificationMiddleware(), # 只关心澄清
]
# 核心逻辑保持纯净
def run_agent():
for m in middlewares:
m.before_agent(state)
response = model.generate() # 核心逻辑清晰可见
for m in middlewares:
m.after_agent(state)
return responseWhy 中间件架构:
- 关注点分离: 每个中间件只做一件事,代码清晰
- 可插拔: 通过配置动态启用/禁用中间件
- 顺序可控: 明确依赖关系,避免隐式耦合
- 可扩展: 新增功能只需添加新中间件
核心洞察:
中间件模式本质是责任链模式,将横切关注点从核心业务逻辑中剥离。
这是从 “混乱的缠绕代码” 到 “清晰的流水线” 的转变。
2.4 关键决策 3: Why 这种中间件执行顺序?
中间件执行顺序(关键!):
1. ThreadDataMiddleware ← 最先(其他依赖 thread_id)
2. UploadsMiddleware
3. SandboxMiddleware
4. DanglingToolCallMiddleware
5. SummarizationMiddleware ← 上下文摘要(可选)
6. TodoListMiddleware ← Plan Mode(可选)
7. TitleMiddleware ← 在 Memory 之前
8. MemoryMiddleware ← 在 Title 之后
9. ViewImageMiddleware ← 视觉支持(可选)
10. SubagentLimitMiddleware ← 子代理限制(可选)
11. ClarificationMiddleware ← 必须最后!
Why 这个顺序?关键依赖分析:
| 中间件 | 为什么在这个位置 | 依赖关系 |
|---|---|---|
| ThreadDataMiddleware | 第1个 | 生成 thread_id,后续所有中间件都依赖 |
| SandboxMiddleware | 第3个 | 依赖 thread_id 获取沙盒 |
| TitleMiddleware | 第7个 | 需要在 Memory 之前(Memory 要记录标题) |
| MemoryMiddleware | 第8个 | 需要在 Title 之后(记录完整的对话轮次) |
| ClarificationMiddleware | 最后 | 拦截所有澄清请求,必须在最后才能捕获所有情况 |
反例:如果顺序错了会怎样?
# 错误顺序:MemoryMiddleware 在 TitleMiddleware 之前
middlewares = [
MemoryMiddleware(), # 先记录记忆
TitleMiddleware(), # 后生成标题
]
# 结果:
# 1. MemoryMiddleware 记录对话(无标题)
# 2. TitleMiddleware 生成标题(新消息)
# 3. 记忆库里缺少标题这轮对话
# → 记忆不完整!核心洞察:
中间件顺序不是随意的,而是由数据依赖关系决定的。
明确声明顺序,比隐式依赖更容易理解和维护。
2.5 关键决策 4: Why 子代理并发限制(最多3个)?
背景: Lead Agent 可以并行启动多个子代理,但如果不加限制:
无限制的问题:
用户: "对比这 10 个云厂商"
Agent: 同时启动 10 个子代理
问题:
1. 系统资源爆炸(CPU/内存/网络)
2. 触发 LLM API 限流
3. 用户等待时间不确定(最慢的那个决定)
4. 结果可能混乱,难以综合
Why 限制为 3 个?
认知科学角度:
- 人类工作记忆是 7±2 个组块
- 但并行处理的任务,最好控制在 3-4 个
- 超过 3 个,综合结果的复杂度呈指数增长
工程角度:
- API 限流保护(避免触发限制)
- 资源保护(防止系统过载)
- 响应时间可控(等待最慢的 3 个之一)
用户体验角度:
- 3 个子任务的结果容易综合
- 用户能追踪进度(不太乱)
- 超过 3 个建议分批(多轮对话)
Why 硬性截断而非提示词限制?
# ❌ 软约束:提示词限制(不可靠)
"你最多只能调用 3 个子代理" # 模型可能不遵守
# ✅ 硬约束:中间件截断(可靠)
class SubagentLimitMiddleware:
def after_model(self, state):
tool_calls = state.messages[-1].tool_calls
task_calls = [c for c in tool_calls if c.name == "task"]
if len(task_calls) > 3:
state.messages[-1].tool_calls = tool_calls[:3] # 强制截断核心洞察:
永远不要相信提示词约束,必须有硬性兜底机制。
提示词是 “建议”,代码是 “强制”。
2.6 关键决策 5: Why 澄清机制这样设计?
问题: 当用户需求不明确时,怎么办?
方案对比:
| 方案 | Why 不选 | DeerFlow 选择 |
|---|---|---|
| 猜测执行 | 可能做错,浪费用户时间 | ❌ |
| 直接报错 | 用户体验差 | ❌ |
| 询问澄清 ✅ | 明确需求后再执行,准确性高 | ✅ |
Why ClarificationMiddleware 必须是最后一个?
middlewares = [
# ... 其他中间件 ...
ClarificationMiddleware(), # ← 必须最后!
]原因:
- 拦截所有澄清请求: 必须在所有其他处理之后,确保能捕获所有情况
- 强制中断执行: 使用
Command(goto=END)立即终止流程 - 状态保持: 不添加额外 AI 消息,保持对话状态干净
Why 用工具调用实现澄清?
# 方式1: 直接在回复中问(不稳定)
AI: "我不太明白,你能详细说说吗?"
# 问题:模型可能不这样回复,或者回复格式不统一
# 方式2: 工具调用(DeerFlow 选择)
AI: tool_call(name="ask_clarification", args={...})
# 优点:强制结构化,中间件可以拦截,前端可以特殊渲染核心洞察:
关键流程节点必须用代码控制,不能依赖模型的”自觉性”。
工具调用 + 中间件拦截 = 确定性的流程控制。
2.7 关键决策 6: Why ThreadState 这样设计?
ThreadState 的核心字段:
class ThreadState:
sandbox: SandboxState # 沙盒状态
thread_data: ThreadDataState # 目录路径
title: str # 对话标题
artifacts: list[str] # 生成文件
todos: list # 任务列表
uploaded_files: list[dict] # 上传文件
viewed_images: dict # 查看的图片Why 这些字段?
| 字段 | Why 需要 | What 解决 |
|---|---|---|
| sandbox | 沙盒需要跨轮次保持 | 代码执行环境隔离 |
| thread_data | 文件操作需要知道路径 | 虚拟路径映射 |
| title | 用户需要知道对话主题 | 自动生成标题 |
| artifacts | 需要跟踪生成的文件 | 文件去重和展示 |
| todos | Plan Mode 需要任务跟踪 | 复杂任务管理 |
| uploaded_images | 多模态需要记住看了什么图 | 避免重复处理 |
Why 线程级隔离?
Thread 1 (用户A) ←→ State 1 (sandbox_A, data_A)
Thread 2 (用户B) ←→ State 2 (sandbox_B, data_B)
- 安全性: 用户 A 不能访问用户 B 的文件
- 一致性: 每个线程有自己的沙盒和上下文
- 可扩展性: 支持多租户架构
Why 用 TypedDict + Annotated?
# 类型安全 + Reducer 注解
artifacts: Annotated[list[str], merge_artifacts]- 类型安全: 编译时检查,IDE 提示
- Reducer: 定义状态如何合并(去重、清空等)
2.8 设计哲学总结
核心原则:
- 配置优于代码: 工厂函数让 Agent 能力由配置决定
- 关注点分离: 中间件让横切关注点独立于业务逻辑
- 硬性约束: 关键流程(并发限制、澄清)用代码强制,不依赖提示词
- 线程隔离: 多租户安全的基础
- 渐进式复杂度: 只在必要时引入复杂性
关键洞察:
Lead Agent 的本质是 “编排器”,不是 “执行者”。
它的核心价值是协调子代理,而不是自己完成所有任务。
Part 3: How(如何实现)
⚠️ 注意: 这部分讲实现细节。建议在理解 Part 2(Why)之后再读。
3.1 工厂函数实现
核心流程:
def make_lead_agent(config: RunnableConfig):
# 1. 配置解析
cfg = config.get("configurable", {})
# 2. 模型创建(支持动态选择)
model = create_chat_model(
name=cfg.get("model_name"),
thinking_enabled=cfg.get("thinking_enabled", True)
)
# 3. 工具加载(支持分组)
tools = get_available_tools(
model_name=model_name,
subagent_enabled=cfg.get("subagent_enabled", False)
)
# 4. 中间件链构建(关键!)
middleware = _build_middlewares(config, model_name)
# 5. 系统提示词生成(动态模板)
system_prompt = apply_prompt_template(
subagent_enabled=subagent_enabled,
agent_name=agent_name
)
# 6. 创建 Agent
return create_agent(
model=model,
tools=tools,
middleware=middleware,
system_prompt=system_prompt,
state_schema=ThreadState,
)3.2 中间件链构建
关键:顺序决定依赖:
def _build_middlewares(config, model_name):
middlewares = [
ThreadDataMiddleware(), # 1. 先生成 thread_id
UploadsMiddleware(), # 2. 处理上传
SandboxMiddleware(), # 3. 获取沙盒(依赖 thread_id)
DanglingToolCallMiddleware(), # 4. 清理残留工具调用
]
# 可选中间件
if is_plan_mode:
middlewares.append(TodoListMiddleware())
# 关键顺序:Title 在 Memory 之前
middlewares.append(TitleMiddleware())
middlewares.append(MemoryMiddleware())
# 最后:澄清拦截
middlewares.append(ClarificationMiddleware())
return middlewares3.3 子代理限制实现
class SubagentLimitMiddleware:
def __init__(self, max_concurrent=3):
self.max_concurrent = max(2, min(4, max_concurrent)) # 限制范围
def after_model(self, state):
tool_calls = state.messages[-1].tool_calls
task_calls = [c for c in tool_calls if c.name == "task"]
if len(task_calls) > self.max_concurrent:
# 硬性截断(不是报错,是静默丢弃)
kept = task_calls[:self.max_concurrent]
other = [c for c in tool_calls if c.name != "task"]
state.messages[-1].tool_calls = kept + other
logger.warning(f"截断 {len(task_calls) - self.max_concurrent} 个超额子代理调用")3.4 澄清拦截实现
class ClarificationMiddleware:
def wrap_tool_call(self, request, handler):
# 检查是否是澄清请求
if request.tool_call["name"] != "ask_clarification":
return handler(request) # 正常执行
# 拦截并中断执行
return Command(
update={"messages": [ToolMessage(
content=formatted_question,
tool_call_id=request.tool_call["id"],
name="ask_clarification"
)]},
goto=END # 强制结束
)3.5 ThreadState Reducer 实现
# 去重且保持顺序
def merge_artifacts(existing: list, new: list) -> list:
if existing is None:
return new or []
if new is None:
return existing
return list(dict.fromkeys(existing + new)) # dict.fromkeys 去重
# 使用 Annotated 绑定 Reducer
class ThreadState:
artifacts: Annotated[list[str], merge_artifacts]总结
What
Lead Agent 是 DeerFlow 的核心编排器,负责任务分解、子代理协调、工具管理和记忆维护。
Why(重点)
- Why 工厂函数: 配置驱动,灵活性 > 封装性
- Why 中间件架构: 关注点分离,横切关注点独立
- Why 这种中间件顺序: 由数据依赖关系决定
- Why 子代理限制3个: 认知科学 + 工程保护 + 用户体验
- Why 硬性截断: 永远不要相信提示词约束
- Why 澄清拦截: 关键流程必须代码控制
- Why 线程隔离: 多租户安全的基础
How
- 工厂函数动态组装(模型、工具、中间件、提示词)
- 中间件链顺序执行(明确依赖关系)
- 硬性约束兜底(并发限制、澄清拦截)
- 类型安全 + Reducer 状态管理
核心原则: Why 比 How 重要。理解设计意图胜过实现细节。