1-Parallelization
ๅนถ่กๅ่ฝๅ.
1)-ๆๅปบไธไธช็ฎๅ็็บฟๆงๅพ
from IPython.display import Image, display
from typing import Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
state: str
class ReturnNodeValue:
def __init__(self, node_secret: str):
self._value = node_secret
def __call__(self, state: State) -> Any:
print(f"Adding {self._value} to {state['state']}")
return {"state": [self._value]}
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))่ฟ่ก็่ฏ๏ผ ๆฏไธๆญฅ้ฝไผ่ฆ็็ถๆ.
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm B"]
Adding I'm D to ["I'm C"]2)-ไธ้ข่ฎฉ B ๅ C ๅนถ่ก
ๆนไธ่พนๅฐฑ่ก.
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))่ฟไธชๆถๅ fan-out ไป a โ b , a - > c , ็ถๅไธ่ตท fan-in ๅฐ d .
ไฝๆฏไปฃ็ ไผ็ดๆฅๅบ้, ๅ ไธบ B ๅ C ้ฝๅจ่ฟไธๆญฅ ้ๆนไบๅไธไธช StateKey . ๅฟ
้กปไฝฟ็จ Reducer
ๆฅ้ไฟกๆฏๅฆไธ:
Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] An error occurred: At key 'state': Can receive only one value per step. Use an Annotated key to handle multiple values
class State(TypedDict):
# The operator.add reducer fn makes this append-only
state: Annotated[list, operator.add]3)-ไธ้ข็ไพๅญไธญ 2ๆกๅนถ่ก๏ผ ไฝๆฏไธๆก่ทฏๅพๅนถๅฆไธๆกๆดๅค
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))่พๅบๅฆไธ:
Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm A"]
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"]
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
- ๅ็ฐๅจ
B2ๆง่ก็ๆถๅ๏ผๅทฒ็ปๆง่กไบbๅc
4)-ไธ้ข็ไพๅญไธญ่ฟไธๆญฅ็จ sort_reducer ๆงๅถไบ merge ็้กบๅบ
def sorting_reducer(left, right):
""" Combines and sorts the values in a list"""
if not isinstance(left, list):
left = [left]
if not isinstance(right, list):
right = [right]
return sorted(left + right, reverse=False)
class State(TypedDict):
# sorting_reducer will sort the values in state
state: Annotated[list, sorting_reducer]
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))- ่ฟ้ไผ็ดๆฅๅฏนๆๆ็็ถๆๅผ่ฟ่กๅ จๅฑ็ๆๅบ
5)-ไธ้ข็ไพๅญไธญ็ปไบ sink ่็น็ๆนๆณ๏ผ็ปๅ จๅฑ่ฟ่กๆๅบ
class State(TypedDict):
main_output: list # ไธป่ฆ่พๅบๅญๆฎต
temp_b: Optional[str] # B่็น็ไธดๆถ่พๅบ
temp_c: Optional[str] # C่็น็ไธดๆถ่พๅบ
temp_b2: Optional[str] # B2่็น็ไธดๆถ่พๅบ
def node_b(state):
# B่็น็ๅค็้ป่พ
return {"temp_b": "I'm B"}
def node_c(state):
# C่็น็ๅค็้ป่พ
return {"temp_c": "I'm C"}
def node_b2(state):
# B2่็น็ๅค็้ป่พ
return {"temp_b2": "I'm B2"}
def sink_node(state):
# ๆ็
งๆไปฌๆๅฎ็้กบๅบๆถ้ไธดๆถๅญๆฎต
collected_values = []
# ๆๅฎๆถ้้กบๅบ
if state.get("temp_b") is not None:
collected_values.append(state["temp_b"])
if state.get("temp_b2") is not None:
collected_values.append(state["temp_b2"])
if state.get("temp_c") is not None:
collected_values.append(state["temp_c"])
# ๅฐๆถ้็ๅผๆทปๅ ๅฐไธป่พๅบๅญๆฎต
return {
"main_output": state.get("main_output", []) + collected_values,
# ๆธ
้คไธดๆถๅญๆฎต
"temp_b": None,
"temp_c": None,
"temp_b2": None
}- ไฝฟ็จไบไธดๆถๅญๆฎต๏ผ็ถๅ็ปไธๅค็
6)-ๅฎ้ ไธญ็็ฑปไพๅญ๏ผ ็ปดๅบ็พ็งๆ็ดข + web ๆ็ดข
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults
def search_web(state):
""" Retrieve docs from web search """
# Search
tavily_search = TavilySearchResults(max_results=3)
search_docs = tavily_search.invoke(state['question'])
# Format
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
def search_wikipedia(state):
""" Retrieve docs from wikipedia """
# Search
search_docs = WikipediaLoader(query=state['question'],
load_max_docs=2).load()
# Format
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"context": [formatted_search_docs]}
def generate_answer(state):
""" Node to answer a question """
# Get state
context = state["context"]
question = state["question"]
# Template
answer_template = """Answer the question {question} using this context: {context}"""
answer_instructions = answer_template.format(question=question,
context=context)
# Answer
answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"Answer the question.")])
# Append it to state
return {"answer": answer}
# Add nodes
builder = StateGraph(State)
# Initialize each node with node_secret
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)
# Flow
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))2-Sub graphs
ไพๅญๆ่ฟฐ:
- ไธไธช็ณป็ปๅจๆฅๆถๆฅๅฟ ;
- ็ถๅๆง่ก2ไธช ็ฌ็ซ็
sub-tasks: ไธไธช่ด่ดฃๆป็ปlogs, ๅฆไธไธช่ด่ดฃๆฅ่ฏขfailure modes; - ็ถๅๅธๆ็จ2ไธช sub-graph ๆง่ก2ไธช็ฌ็ซ็ไปปๅก
่ฟ้ไธป่ฆๆฏไธบไบ็่งฃ graphs ไน้ดๅฆไฝ่ฟ่ก้ไฟก. ๅฉ็จไบ over-lapping keys ่ฟ็งๆๆฏ.
- ๅญๅพๅฏไปฅ่ฎฟ้ฎ ็ถๅพไธญ็
docs - ็ถๅพๅฏไปฅ่ฎฟ้ฎ ๅญๅพ็ summary ๅ failure_report
1)-Log ็็ปๆๅฆไธ
from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated
# The structure of the logs
class Log(TypedDict):
id: str
question: str
docs: Optional[List]
answer: str
grade: Optional[int]
grader: Optional[str]
feedback: Optional[str]2)-ๆ ้ๅๆๅญๅพ ่ฎพ่ฎก
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
# Failure Analysis Sub-graph
class FailureAnalysisState(TypedDict):
cleaned_logs: List[Log]
failures: List[Log]
fa_summary: str
processed_logs: List[str]
class FailureAnalysisOutputState(TypedDict):
fa_summary: str
processed_logs: List[str]
def get_failures(state):
""" Get logs that contain a failure """
cleaned_logs = state["cleaned_logs"]
failures = [log for log in cleaned_logs if "grade" in log]
return {"failures": failures}
def generate_summary(state):
""" Generate summary of failures """
failures = state["failures"]
# Add fxn: fa_summary = summarize(failures)
fa_summary = "Poor quality retrieval of Chroma documentation."
return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]}
fa_builder = StateGraph(FailureAnalysisState,output=FailureAnalysisOutputState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)
graph = fa_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))่ฟไธชๅญๅพๅ ๅซไธคไธช่็น๏ผ
- get_failures: ไปๆธ ๆด่ฟ็ๆฅๅฟไธญ่ทๅๅ ๅซๅคฑ่ดฅ็ๆฅๅฟ
- generate_summary: ็ๆๅคฑ่ดฅๅๆ็ๆ่ฆ
ๅญๅพ็ๆต็จๆฏ๏ผSTARTย โ get_failures โ generate_summary โ END
3)-้ฎ้ขๆ่ฆๅญๅพ(Question Summarization Sub-graph)
# Summarization subgraph
class QuestionSummarizationState(TypedDict):
cleaned_logs: List[Log]
qs_summary: str
report: str
processed_logs: List[str]
class QuestionSummarizationOutputState(TypedDict):
report: str
processed_logs: List[str]
def generate_summary(state):
cleaned_logs = state["cleaned_logs"]
# Add fxn: summary = summarize(generate_summary)
summary = "Questions focused on usage of ChatOllama and Chroma vector store."
return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]}
def send_to_slack(state):
qs_summary = state["qs_summary"]
# Add fxn: report = report_generation(qs_summary)
report = "foo bar baz"
return {"report": report}
qs_builder = StateGraph(QuestionSummarizationState,output=QuestionSummarizationOutputState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)
graph = qs_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))่ฟไธชๅญๅพๅ ๅซไธคไธช่็น๏ผ
- generate_summary: ็ๆ้ฎ้ขๆ่ฆ
- send_to_slack: ๅฐๆ่ฆๅ้ๅฐSlack
ๅญๅพ็ๆต็จๆฏ๏ผSTARTย โ generate_summary โ send_to_slackย โ END
4)-ๅ ฅๅฃๅพ่็น่ฎพ่ฎก: Entry Graph
class EntryGraphState(TypedDict):
raw_logs: List[Log]
cleaned_logs: List[Log]
fa_summary: str # ๅชๅจๆ
้ๅๆๅญๅพไธญ็ๆ
report: str # ๅชๅจ้ฎ้ขๆ่ฆๅญๅพไธญ็ๆ
processed_logs: Annotated[List[int], add] # ๅจไธคไธชๅญๅพไธญ้ฝไผ็ๆ