Part 1: What(是什么)

1.1 系统定义

Lead Agent 是 DeerFlow 的核心智能体(Agent),负责协调任务执行、工具调用、子代理管理和记忆维护。

核心定位:

  • 不是简单的聊天机器人,而是任务编排引擎
  • 支持子代理并行执行(DeerFlow 的核心特性)
  • 采用中间件架构实现高度可扩展的请求处理流水线

1.2 核心职责

职责What(是什么)
任务编排分解复杂任务,协调多个子代理并行执行
工具管理动态加载 MCP、Sandbox、内置工具
记忆管理长期记忆注入和上下文维护
澄清机制智能识别歧义,主动请求用户澄清
状态管理维护线程级隔离的状态(ThreadState)

1.3 系统架构概览

用户请求
    ↓
Lead Agent(工厂函数创建)
    ├─ 模型(动态选择)
    ├─ 工具集(动态加载)
    ├─ 中间件链(10个中间件顺序执行)
    └─ 系统提示词(动态生成)
    ↓
响应返回

1.4 关键数据结构

ThreadState(线程级状态):

class ThreadState:
    sandbox: SandboxState           # 沙盒执行环境
    thread_data: ThreadDataState    # 线程数据目录
    title: str                      # 对话标题
    artifacts: list[str]            # 生成文件列表
    todos: list                     # 任务列表(Plan Mode)
    uploaded_files: list[dict]      # 上传文件
    viewed_images: dict             # 已查看图片

核心特点: 每个 thread_id 拥有独立的隔离状态,实现多租户。


Part 2: Why(为什么这样设计)← 重点

2.1 核心问题:为什么需要 Lead Agent?

简单 Agent 的问题:

用户: "分析一下我们的竞争对手"
简单 Agent: 直接搜索 → 给出结果

问题:
1. 搜索范围太广,不够深入
2. 没有分领域分析(产品、技术、市场)
3. 没有交叉验证信息
4. 单线程执行,效率低

Lead Agent 的解决方案:

用户: "分析一下我们的竞争对手"
Lead Agent:
  1. 分解为 3 个子任务(产品、技术、市场)
  2. 并行启动 3 个子代理同时分析
  3. 收集所有结果,综合对比
  4. 生成完整的竞争分析报告

Why Lead Agent 是核心:

  • 编排能力: 复杂任务需要分解和协调
  • 并行效率: 子代理并行执行,显著提速
  • 质量保证: 多维度分析 + 交叉验证

2.2 关键决策 1: 为什么用工厂函数而非类继承?

两种方案的对比:

# ❌ 方案1: 类继承(传统方式)
class LeadAgent(BaseAgent):
    def __init__(self):
        self.memory = Memory()
        self.sandbox = Sandbox()
        # 问题: 组件硬编码,运行时无法灵活替换
 
# ✅ 方案2: 工厂函数(DeerFlow 方式)
def make_lead_agent(config: RunnableConfig):
    model = create_chat_model(config.model_name)      # 动态选择
    tools = get_available_tools(config.tool_groups)   # 动态加载
    middleware = _build_middlewares(config)           # 动态组装
    return create_agent(model, tools, middleware, ...)

Why 工厂函数更好:

维度类继承工厂函数
灵活性低(组件硬编码)高(运行时组装)
配置驱动难(需要改代码)易(config 决定行为)
测试性差(依赖难以 Mock)好(依赖注入)
扩展性继承层次复杂组合优于继承

核心洞察:

Agent 的能力应该是配置决定的,而不是代码写死的。

同一个 factory,传入不同配置,可以创建完全不同的 Agent。

权衡: 工厂函数丢失了面向对象的封装性,但获得了更大的灵活性。对于 Agent 这种需要高度定制的场景,灵活性更重要。

2.3 关键决策 2: 为什么用中间件架构?

问题背景: Agent 执行流程中有许多横切关注点:

  • 初始化线程目录
  • 处理上传文件
  • 获取沙盒实例
  • 生成对话标题
  • 更新长期记忆
  • 拦截澄清请求

不用中间件的方案:

def run_agent():
    # 1. 初始化目录(耦合在核心逻辑里)
    init_thread_dirs()
 
    # 2. 获取沙盒(耦合)
    acquire_sandbox()
 
    # 3. 实际业务逻辑
    response = model.generate()
 
    # 4. 生成标题(耦合)
    generate_title()
 
    # 5. 更新记忆(耦合)
    update_memory()
 
    return response

问题:

  • 核心业务逻辑被淹没
  • 新增功能需要修改核心代码
  • 功能之间可能冲突

中间件架构的解决方案:

# 每个关注点封装为独立中间件
middlewares = [
    ThreadDataMiddleware(),    # 只关心目录初始化
    SandboxMiddleware(),       # 只关心沙盒
    TitleMiddleware(),         # 只关心标题
    MemoryMiddleware(),        # 只关心记忆
    ClarificationMiddleware(), # 只关心澄清
]
 
# 核心逻辑保持纯净
def run_agent():
    for m in middlewares:
        m.before_agent(state)
 
    response = model.generate()  # 核心逻辑清晰可见
 
    for m in middlewares:
        m.after_agent(state)
 
    return response

Why 中间件架构:

  1. 关注点分离: 每个中间件只做一件事,代码清晰
  2. 可插拔: 通过配置动态启用/禁用中间件
  3. 顺序可控: 明确依赖关系,避免隐式耦合
  4. 可扩展: 新增功能只需添加新中间件

核心洞察:

中间件模式本质是责任链模式,将横切关注点从核心业务逻辑中剥离。

这是从 “混乱的缠绕代码” 到 “清晰的流水线” 的转变。

2.4 关键决策 3: Why 这种中间件执行顺序?

中间件执行顺序(关键!):

1. ThreadDataMiddleware      ← 最先(其他依赖 thread_id)
2. UploadsMiddleware
3. SandboxMiddleware
4. DanglingToolCallMiddleware
5. SummarizationMiddleware   ← 上下文摘要(可选)
6. TodoListMiddleware        ← Plan Mode(可选)
7. TitleMiddleware           ← 在 Memory 之前
8. MemoryMiddleware          ← 在 Title 之后
9. ViewImageMiddleware       ← 视觉支持(可选)
10. SubagentLimitMiddleware  ← 子代理限制(可选)
11. ClarificationMiddleware  ← 必须最后!

Why 这个顺序?关键依赖分析:

中间件为什么在这个位置依赖关系
ThreadDataMiddleware第1个生成 thread_id,后续所有中间件都依赖
SandboxMiddleware第3个依赖 thread_id 获取沙盒
TitleMiddleware第7个需要在 Memory 之前(Memory 要记录标题)
MemoryMiddleware第8个需要在 Title 之后(记录完整的对话轮次)
ClarificationMiddleware最后拦截所有澄清请求,必须在最后才能捕获所有情况

反例:如果顺序错了会怎样?

# 错误顺序:MemoryMiddleware 在 TitleMiddleware 之前
middlewares = [
    MemoryMiddleware(),    # 先记录记忆
    TitleMiddleware(),     # 后生成标题
]
 
# 结果:
# 1. MemoryMiddleware 记录对话(无标题)
# 2. TitleMiddleware 生成标题(新消息)
# 3. 记忆库里缺少标题这轮对话
# → 记忆不完整!

核心洞察:

中间件顺序不是随意的,而是由数据依赖关系决定的。

明确声明顺序,比隐式依赖更容易理解和维护。

2.5 关键决策 4: Why 子代理并发限制(最多3个)?

背景: Lead Agent 可以并行启动多个子代理,但如果不加限制:

无限制的问题:

用户: "对比这 10 个云厂商"
Agent: 同时启动 10 个子代理

问题:
1. 系统资源爆炸(CPU/内存/网络)
2. 触发 LLM API 限流
3. 用户等待时间不确定(最慢的那个决定)
4. 结果可能混乱,难以综合

Why 限制为 3 个?

认知科学角度:

  • 人类工作记忆是 7±2 个组块
  • 并行处理的任务,最好控制在 3-4 个
  • 超过 3 个,综合结果的复杂度呈指数增长

工程角度:

  • API 限流保护(避免触发限制)
  • 资源保护(防止系统过载)
  • 响应时间可控(等待最慢的 3 个之一)

用户体验角度:

  • 3 个子任务的结果容易综合
  • 用户能追踪进度(不太乱)
  • 超过 3 个建议分批(多轮对话)

Why 硬性截断而非提示词限制?

# ❌ 软约束:提示词限制(不可靠)
"你最多只能调用 3 个子代理"  # 模型可能不遵守
 
# ✅ 硬约束:中间件截断(可靠)
class SubagentLimitMiddleware:
    def after_model(self, state):
        tool_calls = state.messages[-1].tool_calls
        task_calls = [c for c in tool_calls if c.name == "task"]
        if len(task_calls) > 3:
            state.messages[-1].tool_calls = tool_calls[:3]  # 强制截断

核心洞察:

永远不要相信提示词约束,必须有硬性兜底机制。

提示词是 “建议”,代码是 “强制”。

2.6 关键决策 5: Why 澄清机制这样设计?

问题: 当用户需求不明确时,怎么办?

方案对比:

方案Why 不选DeerFlow 选择
猜测执行可能做错,浪费用户时间
直接报错用户体验差
询问澄清明确需求后再执行,准确性高

Why ClarificationMiddleware 必须是最后一个?

middlewares = [
    # ... 其他中间件 ...
    ClarificationMiddleware(),  # ← 必须最后!
]

原因:

  1. 拦截所有澄清请求: 必须在所有其他处理之后,确保能捕获所有情况
  2. 强制中断执行: 使用 Command(goto=END) 立即终止流程
  3. 状态保持: 不添加额外 AI 消息,保持对话状态干净

Why 用工具调用实现澄清?

# 方式1: 直接在回复中问(不稳定)
AI: "我不太明白,你能详细说说吗?"
# 问题:模型可能不这样回复,或者回复格式不统一
 
# 方式2: 工具调用(DeerFlow 选择)
AI: tool_call(name="ask_clarification", args={...})
# 优点:强制结构化,中间件可以拦截,前端可以特殊渲染

核心洞察:

关键流程节点必须用代码控制,不能依赖模型的”自觉性”

工具调用 + 中间件拦截 = 确定性的流程控制。

2.7 关键决策 6: Why ThreadState 这样设计?

ThreadState 的核心字段:

class ThreadState:
    sandbox: SandboxState           # 沙盒状态
    thread_data: ThreadDataState    # 目录路径
    title: str                      # 对话标题
    artifacts: list[str]            # 生成文件
    todos: list                     # 任务列表
    uploaded_files: list[dict]      # 上传文件
    viewed_images: dict             # 查看的图片

Why 这些字段?

字段Why 需要What 解决
sandbox沙盒需要跨轮次保持代码执行环境隔离
thread_data文件操作需要知道路径虚拟路径映射
title用户需要知道对话主题自动生成标题
artifacts需要跟踪生成的文件文件去重和展示
todosPlan Mode 需要任务跟踪复杂任务管理
uploaded_images多模态需要记住看了什么图避免重复处理

Why 线程级隔离?

Thread 1 (用户A) ←→ State 1 (sandbox_A, data_A)
Thread 2 (用户B) ←→ State 2 (sandbox_B, data_B)
  • 安全性: 用户 A 不能访问用户 B 的文件
  • 一致性: 每个线程有自己的沙盒和上下文
  • 可扩展性: 支持多租户架构

Why 用 TypedDict + Annotated?

# 类型安全 + Reducer 注解
artifacts: Annotated[list[str], merge_artifacts]
  • 类型安全: 编译时检查,IDE 提示
  • Reducer: 定义状态如何合并(去重、清空等)

2.8 设计哲学总结

核心原则:

  1. 配置优于代码: 工厂函数让 Agent 能力由配置决定
  2. 关注点分离: 中间件让横切关注点独立于业务逻辑
  3. 硬性约束: 关键流程(并发限制、澄清)用代码强制,不依赖提示词
  4. 线程隔离: 多租户安全的基础
  5. 渐进式复杂度: 只在必要时引入复杂性

关键洞察:

Lead Agent 的本质是 “编排器”,不是 “执行者”

它的核心价值是协调子代理,而不是自己完成所有任务。


Part 3: How(如何实现)

⚠️ 注意: 这部分讲实现细节。建议在理解 Part 2(Why)之后再读。

3.1 工厂函数实现

核心流程:

def make_lead_agent(config: RunnableConfig):
    # 1. 配置解析
    cfg = config.get("configurable", {})
 
    # 2. 模型创建(支持动态选择)
    model = create_chat_model(
        name=cfg.get("model_name"),
        thinking_enabled=cfg.get("thinking_enabled", True)
    )
 
    # 3. 工具加载(支持分组)
    tools = get_available_tools(
        model_name=model_name,
        subagent_enabled=cfg.get("subagent_enabled", False)
    )
 
    # 4. 中间件链构建(关键!)
    middleware = _build_middlewares(config, model_name)
 
    # 5. 系统提示词生成(动态模板)
    system_prompt = apply_prompt_template(
        subagent_enabled=subagent_enabled,
        agent_name=agent_name
    )
 
    # 6. 创建 Agent
    return create_agent(
        model=model,
        tools=tools,
        middleware=middleware,
        system_prompt=system_prompt,
        state_schema=ThreadState,
    )

3.2 中间件链构建

关键:顺序决定依赖:

def _build_middlewares(config, model_name):
    middlewares = [
        ThreadDataMiddleware(),      # 1. 先生成 thread_id
        UploadsMiddleware(),          # 2. 处理上传
        SandboxMiddleware(),          # 3. 获取沙盒(依赖 thread_id)
        DanglingToolCallMiddleware(), # 4. 清理残留工具调用
    ]
 
    # 可选中间件
    if is_plan_mode:
        middlewares.append(TodoListMiddleware())
 
    # 关键顺序:Title 在 Memory 之前
    middlewares.append(TitleMiddleware())
    middlewares.append(MemoryMiddleware())
 
    # 最后:澄清拦截
    middlewares.append(ClarificationMiddleware())
 
    return middlewares

3.3 子代理限制实现

class SubagentLimitMiddleware:
    def __init__(self, max_concurrent=3):
        self.max_concurrent = max(2, min(4, max_concurrent))  # 限制范围
 
    def after_model(self, state):
        tool_calls = state.messages[-1].tool_calls
        task_calls = [c for c in tool_calls if c.name == "task"]
 
        if len(task_calls) > self.max_concurrent:
            # 硬性截断(不是报错,是静默丢弃)
            kept = task_calls[:self.max_concurrent]
            other = [c for c in tool_calls if c.name != "task"]
            state.messages[-1].tool_calls = kept + other
 
            logger.warning(f"截断 {len(task_calls) - self.max_concurrent} 个超额子代理调用")

3.4 澄清拦截实现

class ClarificationMiddleware:
    def wrap_tool_call(self, request, handler):
        # 检查是否是澄清请求
        if request.tool_call["name"] != "ask_clarification":
            return handler(request)  # 正常执行
 
        # 拦截并中断执行
        return Command(
            update={"messages": [ToolMessage(
                content=formatted_question,
                tool_call_id=request.tool_call["id"],
                name="ask_clarification"
            )]},
            goto=END  # 强制结束
        )

3.5 ThreadState Reducer 实现

# 去重且保持顺序
def merge_artifacts(existing: list, new: list) -> list:
    if existing is None:
        return new or []
    if new is None:
        return existing
    return list(dict.fromkeys(existing + new))  # dict.fromkeys 去重
 
# 使用 Annotated 绑定 Reducer
class ThreadState:
    artifacts: Annotated[list[str], merge_artifacts]

总结

What

Lead Agent 是 DeerFlow 的核心编排器,负责任务分解、子代理协调、工具管理和记忆维护。

Why(重点)

  • Why 工厂函数: 配置驱动,灵活性 > 封装性
  • Why 中间件架构: 关注点分离,横切关注点独立
  • Why 这种中间件顺序: 由数据依赖关系决定
  • Why 子代理限制3个: 认知科学 + 工程保护 + 用户体验
  • Why 硬性截断: 永远不要相信提示词约束
  • Why 澄清拦截: 关键流程必须代码控制
  • Why 线程隔离: 多租户安全的基础

How

  • 工厂函数动态组装(模型、工具、中间件、提示词)
  • 中间件链顺序执行(明确依赖关系)
  • 硬性约束兜底(并发限制、澄清拦截)
  • 类型安全 + Reducer 状态管理

核心原则: Why 比 How 重要。理解设计意图胜过实现细节。