1-介绍
使用社区能力中自带的缓存的能力.
1-1 基本使用
测试非流式输出:
if __name__ == "__main__":
# 定义测试消息
messages = [
SystemMessage("你是一个诗人,写一个 100 的诗文,以下面的输入为主题"),
HumanMessage("春天"),
]
# 测试不带缓存的版本
print("测试不带缓存的版本:")
start_time = time.time()
llm_model = model_manager.get_model("qwen-max")
for _ in range(5):
result = llm_model.invoke(messages)
end_time = time.time()
print(f"无缓存版本 (5次调用) 耗时: {end_time - start_time:.4f} 秒")
print(f"最后一次结果: {result}")
# 启用缓存
print("\n设置缓存...")
set_llm_cache(InMemoryCache())
# 测试带缓存的版本
print("测试带缓存的版本:")
start_time = time.time()
llm_model = model_manager.get_model("qwen-max")
for _ in range(5):
result = llm_model.invoke(messages)
end_time = time.time()
print(f"有缓存版本 (5次调用) 耗时: {end_time - start_time:.4f} 秒")
print(f"最后一次结果: {result}")测试不带缓存的版本:
无缓存版本 (5次调用) 耗时: 2.7098 秒
最后一次结果: content='春风轻拂绿柳枝,万物复苏展新姿。\n桃花笑颜迎蜂蝶,樱花纷飞似雪时。\n草长莺飞满眼见,溪水潺潺洗冬疲。\n春色满园关不住,一曲鸣莺报佳期。' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 0, 'prompt_tokens': 0, 'total_tokens': 0, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'from-cache', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None} id='run-6f2899dd-8772-428b-bd2c-32d91233174f-0' usage_metadata={'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'input_token_details': {}, 'output_token_details': {}}
设置缓存...
测试带缓存的版本:
有缓存版本 (5次调用) 耗时: 0.0385 秒
最后一次结果: content='春风轻拂绿柳枝,万物复苏展新姿。\n桃花笑颜迎蜂蝶,樱花纷飞似雪时。\n草长莺飞满眼见,溪水潺潺洗冬疲。\n春色满园关不住,一曲鸣莺报佳期。' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 0, 'prompt_tokens': 0, 'total_tokens': 0, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'from-cache', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None} id='run-11886621-9cd3-4f61-acb7-f7263d76fe83-0' usage_metadata={'input_tokens': 0, 'output_tokens': 0, 'total_tokens': 0, 'input_token_details': {}, 'output_token_details': {}}
测试流式输出
def test_streaming(model, messages, test_name="流式输出测试"):
"""
测试流式输出的性能
Args: model: LLM模型实例
messages: 发送给模型的消息列表
test_name: 测试名称
Returns: dict: 测试结果指标
""" print(f"\n{test_name}:")
start_time = time.time()
full_response = ""
first_chunk_time = None
for chunk in model.stream(messages):
current_time = time.time()
if not full_response and chunk.content:
first_chunk_time = current_time
print(f"首个内容块耗时: {first_chunk_time - start_time:.4f} 秒")
chunk_content = str(chunk.content) if chunk.content else ""
full_response += chunk_content
end_time = time.time()
total_time = end_time - start_time
ttfb = first_chunk_time - start_time if first_chunk_time else None
post_ttfb = end_time - first_chunk_time if first_chunk_time else None
print(f"完整流式输出耗时: {total_time:.4f} 秒")
if ttfb:
print(f"首字延迟 (TTFB): {ttfb:.4f} 秒")
print(f"首字后完成耗时: {post_ttfb:.4f} 秒")
print(f"总字符数: {len(full_response)}")
print(f"平均每秒输出字符数: {len(full_response) / total_time:.2f} 字符/秒")
print(f"内容前80字符: {full_response[:80]}...")
return {
"total_time": total_time,
"ttfb": ttfb,
"post_ttfb": post_ttfb,
"char_count": len(full_response),
"chars_per_second": len(full_response) / total_time,
}if __name__ == "__main__":
# 定义测试消息 - 使用固定的随机种子和温度为0,以确保一致的输出
# 这样我们可以确保缓存能够命中
model_name = "doubao" # 使用您的模型名称
stream_messages = [
SystemMessage(
"你是一个故事作家,写一个详细的短篇故事,以下面的输入为主题。"
"要求:\n"
"1. 包含生动的场景描述\n"
"2. 有丰富的感官细节\n"
"3. 有人物对话\n"
"4. 有情感变化\n"
"5. 有明确的起承转合"
),
HumanMessage("夏日的海滩和一次意外相遇"),
]
# 1. 禁用缓存的测试
print("\n============= 无缓存测试 =============")
# 确保缓存被禁用
set_llm_cache(None)
# 获取模型并设置参数
llm_model = model_manager.get_model(model_name)
# 第一次运行 - 无缓存
no_cache_results = test_streaming(llm_model, stream_messages, "无缓存流式输出测试")
# 2. 启用缓存测试
print("\n============= 有缓存测试 =============")
# 启用缓存
print("设置 LLM 缓存...")
set_llm_cache(InMemoryCache())
# 第一次运行 - 填充缓存
print("\n首次运行 (填充缓存):")
cache_filling_results = test_streaming(
llm_model, stream_messages, "缓存填充流式输出测试"
)
# 第二次运行 - 应该使用缓存
print("\n第二次运行 (应命中缓存):")
cached_results = test_streaming(llm_model, stream_messages, "缓存命中流式输出测试")
# 3. 结果对比
print("\n============= 性能对比 =============")
print(f"无缓存首字延迟 (TTFB): {no_cache_results['ttfb']:.4f} 秒")
print(f"填充缓存首字延迟 (TTFB): {cache_filling_results['ttfb']:.4f} 秒")
print(f"缓存命中首字延迟 (TTFB): {cached_results['ttfb']:.4f} 秒")
if cached_results["ttfb"] > 0 and no_cache_results["ttfb"] > 0:
print(
f"首字延迟加速比 (无缓存 vs 有缓存): {no_cache_results['ttfb'] / cached_results['ttfb']:.2f}x"
)
print(f"\n无缓存总耗时: {no_cache_results['total_time']:.4f} 秒")
print(f"填充缓存总耗时: {cache_filling_results['total_time']:.4f} 秒")
print(f"缓存命中总耗时: {cached_results['total_time']:.4f} 秒")
if cached_results["total_time"] > 0 and no_cache_results["total_time"] > 0:
print(
f"总耗时加速比 (无缓存 vs 有缓存): {no_cache_results['total_time'] / cached_results['total_time']:.2f}x"
)1-2 流式的基本原理
# 这段代码不在您分享的源文件中,而是 LangChain 内部实现的一部分
async def _astream_with_cache(
self, input: dict, cache: BaseCache, llm_string: str
) -> AsyncIterator[ChatGenerationChunk]:
# 检查缓存
prompt = self._convert_input_to_cache_key(input)
cached_generations = await cache.alookup(prompt, llm_string)
if cached_generations is not None:
# 如果找到缓存,以块的形式返回缓存的结果
# 模拟流式输出的行为
for generation in cached_generations:
yield ChatGenerationChunk(message=generation.message)
else:
# 如果未找到缓存,执行实际的流式调用
generations: List[Generation] = []
async for chunk in self._astream(input):
generations.append(chunk) # 收集所有块
yield chunk
# 更新缓存
await cache.aupdate(prompt, llm_string, generations)流式输出的关键区别:
- 缓存命中时:模拟流式输出,将缓存的完整响应拆分成块依次返回
- 缓存未命中时:执行实际的流式调用,同时收集生成的块,最后更新缓存
- 这就解释了为什么即使在缓存命中的情况下,流式输出的总时间仍可能较长 -
LangChain会模拟流式行为,而不是立即返回完整结果。
2-基本源码分析
1)-langChain 定义了一个核心缓存接口,放在 langchain_core/caches.py 文件中.
class BaseCache(ABC):
"""缓存接口,包含以下方法:
- lookup: 根据提示和llm_string查找值
- update: 根据提示和llm_string更新缓存
- clear: 清除缓存
"""
@abstractmethod
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""根据prompt和llm_string查找缓存"""
@abstractmethod
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""根据prompt和llm_string更新缓存"""
@abstractmethod
def clear(self, **kwargs: Any) -> None:
"""清除缓存"""2)-最常用的实现是 InMemoryCache
class InMemoryCache(BaseCache):
"""存储在内存中的缓存"""
def __init__(self) -> None:
"""初始化空缓存"""
self._cache: Dict[Tuple[str, str], RETURN_VAL_TYPE] = {}
def lookup(self, prompt: str, llm_string: str) -> Optional[RETURN_VAL_TYPE]:
"""查找缓存"""
return self._cache.get((prompt, llm_string), None)
def update(self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE) -> None:
"""更新缓存"""
self._cache[(prompt, llm_string)] = return_val
def clear(self, **kwargs: Any) -> None:
"""清除缓存"""
self._cache = {}3)-缓存键的构造
- 缓存的键是一个二元组: (
prompt,llm_string) ; llm_string: 模型配置的字符串表示, 包含模型名称和所有参数, 例如:temperature,top_p等 ;
llm_string 是通过将模型参数转换为有序的参数字符串来生成的.
llm_string = str(sorted(params.items())这确保了相同的模型配置总是产生相同的字符串表示
4)-生成的过程中如何使用缓存
在 _generate_with_cache 方法中(位于 language_models/chat_models.py),缓存的使用流程如下:
def _generate_with_cache(self, messages, stop=None, run_manager=None, **kwargs):
# 1. 获取缓存实例
llm_cache = self.cache if isinstance(self.cache, BaseCache) else get_llm_cache()
check_cache = self.cache or self.cache is None
# 2. 如果启用了缓存,尝试查找缓存
if check_cache and llm_cache:
llm_string = self._get_llm_string(stop=stop, **kwargs)
prompt = dumps(messages) # 序列化消息为字符串
cache_val = llm_cache.lookup(prompt, llm_string)
if isinstance(cache_val, list):
# 缓存命中,直接返回缓存的结果
return ChatResult(generations=cache_val)
# 3. 缓存未命中,执行实际的生成
result = self._generate(messages, stop=stop, **kwargs)
# 4. 更新缓存
if check_cache and llm_cache:
llm_cache.update(prompt, llm_string, result.generations)
return result5)-缓存的设置和获取
def set_llm_cache(value: Optional["BaseCache"]) -> None:
"""设置全局LLM缓存"""
global _llm_cache
_llm_cache = value
def get_llm_cache() -> "BaseCache":
"""获取全局LLM缓存"""
global _llm_cache
return _llm_cache6)-缓存解析逻辑
_resolve_cache函数用于解析缓存参数.
def _resolve_cache(cache: Union[BaseCache, bool, None]) -> Optional[BaseCache]:
"""解析缓存参数"""
if isinstance(cache, BaseCache):
# 直接使用传入的缓存对象
llm_cache = cache
elif cache is None:
# 使用全局缓存
llm_cache = get_llm_cache()
elif cache is True:
# 使用全局缓存,如果全局缓存未设置则报错
llm_cache = get_llm_cache()
if llm_cache is None:
raise ValueError("未配置全局缓存")
elif cache is False:
# 显式禁用缓存
llm_cache = None
else:
raise ValueError(f"不支持的缓存值 {cache}")
return llm_cache- 缓存命中流程:
- 构造 prompt (序列化的消息) 和 llm_string (序列化的参数)
- 查找缓存 llm_cache.lookup(prompt, llm_string)
- 如果找到,直接返回缓存结果
- 如果未找到,执行API调用,然后更新缓存
- 缓存内容:
- 缓存存储的是生成结果 (
generations),通常是 List[Generation] 类型 - 每个
Generation包含生成的消息和元数据
- 缓存存储的是生成结果 (