Batched 异步批处理库原理分析
概述
Batched 是一个高性能的异步批处理库,专门用于将多个独立的请求聚合成批次进行处理,从而显著提升处理效率。该库特别适用于深度学习推理、数据库批量操作等场景。
核心架构
graph TD A[用户请求] --> B[AsyncBatchProcessor.__call__] B --> C{是否已启动?} C -->|否| D[启动批处理器 _start] C -->|是| E[确定优先级 _determine_priority] D --> E E --> F[创建BatchItem 包含content/priority/future] F --> G[添加到队列 batch_queue.extend] G --> H[检查缓存 _check_cache_and_set_result] H --> I{缓存命中?} I -->|是| J[直接返回缓存结果] I -->|否| K[放入优先级队列] K --> L[等待批处理完成 await future] subgraph "后台批处理循环" M[_process_batches] --> N[等待最优批次 optimal_batches] N --> O{队列大小 >= batch_size?} O -->|否| P[等待timeout] P --> N O -->|是| Q[获取批次项目] Q --> R[按长度/大小分组] R --> S[调用batch_func处理] S --> T{处理成功?} T -->|是| U[设置每个item的结果] T -->|否| V[设置每个item的异常] U --> W[更新统计信息] V --> W W --> N end L --> X[返回结果给用户] J --> X
核心组件
graph LR subgraph "核心组件" BP[AsyncBatchProcessor] BG[AsyncBatchGenerator] BI[AsyncBatchItem] end subgraph "配置参数" BS[batch_size: 批次大小] TO[timeout_ms: 超时时间] SBT[small_batch_threshold: 小批次阈值] PS[priority_strategy: 优先级策略] MBL[max_batch_length: 最大批次长度] end subgraph "缓存系统" AC[AsyncCache Protocol] CS[CacheStats: 缓存统计] end subgraph "统计监控" BPS[BatchProcessorStats] QS[queue_size: 队列大小] TP[total_processed: 总处理数] APT[avg_processing_time: 平均处理时间] end BP --> BG BP --> BI BG --> BI BS --> BP TO --> BG SBT --> BP PS --> BP MBL --> BG AC --> BG CS --> AC BPS --> BP QS --> BPS TP --> BPS APT --> BPS
关键原理
1. 异步批处理机制
AsyncBatchProcessor
- 职责: 主要的批处理协调器
- 核心功能:
- 接收单个或批量请求
- 管理批处理生命周期
- 协调缓存、优先级、统计等功能
AsyncBatchGenerator
- 职责: 智能批次生成器
- 关键特性:
- 使用
asyncio.PriorityQueue或asyncio.Queue - 支持基于超时的批次收集
- 支持基于长度的批次分割
- 使用
AsyncBatchItem
- 职责: 单个请求的包装器
- 核心属性:
content: 实际的请求数据future: 异步结果容器priority: 优先级(支持排序)
2. 批次收集策略
sequenceDiagram participant U as 用户 participant P as BatchProcessor participant G as BatchGenerator participant Q as 优先级队列 U->>P: 请求1 P->>G: 添加到队列 G->>Q: 放入队列 U->>P: 请求2 P->>G: 添加到队列 G->>Q: 放入队列 Note over G: 等待更多请求或超时 U->>P: 请求N P->>G: 添加到队列 G->>Q: 放入队列 Note over G: 达到batch_size或超时 G->>G: 生成最优批次 G->>P: 返回批次 P->>P: 批量处理 P->>U: 返回所有结果
3. 优先级策略
- NONE: 先进先出 (FIFO)
- LENGTH: 基于内容长度排序
- PRIORITY: 基于显式优先级值
4. 智能缓存机制
flowchart TD A[收到请求] --> B{缓存启用?} B -->|否| E[直接加入队列] B -->|是| C[检查缓存] C --> D{缓存命中?} D -->|是| F[立即返回缓存结果] D -->|否| G[加入队列并设置缓存回调] E --> H[等待批处理] G --> H H --> I[处理完成] I --> J[存入缓存] F --> K[完成] J --> K
5. 批次优化算法
批次大小优化
- 当队列大小 <
batch_size时,等待timeout_ms - 支持多批次并行处理:
n_batches = max(1, queue_size // batch_size)
长度限制处理
if self._max_batch_length:
batch_items = batch_iter_by_length(
batch_items,
max_batch_length=self._max_batch_length,
batch_size=self._batch_size
)
else:
batch_items = batch_iter(batch_items, self._batch_size)性能特性
1. 吞吐量提升
- 批处理效应: 减少函数调用开销
- 并行处理: 支持 GPU/向量化操作
- 智能等待: 平衡延迟和吞吐量
2. 内存管理
- 流式处理: 不会无限累积请求
- 及时清理: 完成的 Future 会被及时清理
- 可控队列: 队列大小可监控和控制
3. 错误处理
- 异常隔离: 单个请求失败不影响整批
- 优雅降级: 异常会正确传播到对应的 Future
使用模式
装饰器模式
@aio.dynamically(batch_size=32, timeout_ms=5.0)
async def process_texts(texts: list[str]) -> list[int]:
return [len(text) for text in texts]
# 自动批处理
result = await process_texts("hello") # 单个请求
results = await process_texts(["a", "b", "c"]) # 批量请求直接实例化
processor = AsyncBatchProcessor(
func=my_batch_function,
batch_size=32,
timeout_ms=10.0,
cache=my_cache
)
result = await processor("single_item")
results = await processor(["item1", "item2", "item3"])统计监控
graph TD A[BatchProcessorStats] --> B[queue_size: 当前队列大小] A --> C[total_processed: 总处理项目数] A --> D[total_batches: 总批次数] A --> E[avg_batch_size: 平均批次大小] A --> F[avg_processing_time: 平均处理时间] A --> G[cache_stats: 缓存统计] G --> H[gets: 缓存获取次数] G --> I[hits: 缓存命中次数] G --> J[sets: 缓存设置次数] G --> K[avg_get_time: 平均获取时间]
适用场景
- 深度学习推理: 批量处理提升 GPU 利用率
- 数据库操作: 减少网络往返次数
- API 聚合: 将多个请求合并为一个
- 图像处理: 批量处理图像数据
- 文本处理: NLP 模型的批量推理
核心优势
- 零侵入: 通过装饰器无缝集成现有代码
- 高性能: 智能批处理算法最大化吞吐量
- 灵活配置: 丰富的参数配置满足不同需求
- 缓存支持: 内置缓存机制避免重复计算
- 监控友好: 详细的统计信息便于性能调优
- 异常安全: 完善的错误处理和异常传播机制