1-Parallelization

ๅนถ่กŒๅŒ–่ƒฝๅŠ›.

1)-ๆž„ๅปบไธ€ไธช็ฎ€ๅ•็š„็บฟๆ€งๅ›พ

from IPython.display import Image, display
 
from typing import Any
from typing_extensions import TypedDict
 
from langgraph.graph import StateGraph, START, END
 
class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    state: str
 
class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret
 
    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['state']}")
        return {"state": [self._value]}
 
# Add nodes
builder = StateGraph(State)
 
# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
 
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("b", "c")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
 
display(Image(graph.get_graph().draw_mermaid_png()))

่ฟ่กŒ็š„่ฏ๏ผŒ ๆฏไธ€ๆญฅ้ƒฝไผš่ฆ†็›–็Šถๆ€.

Adding I'm A to []
Adding I'm B to ["I'm A"]
Adding I'm C to ["I'm B"]
Adding I'm D to ["I'm C"]

2)-ไธ‹้ข่ฎฉ B ๅ’Œ C ๅนถ่กŒ

ๆ”นไธ‹่พนๅฐฑ่กŒ.

builder = StateGraph(State)
 
# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
 
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
 
display(Image(graph.get_graph().draw_mermaid_png()))

่ฟ™ไธชๆ—ถๅ€™ fan-out ไปŽ a โ†’ b , a - > c , ็„ถๅŽไธ€่ตท fan-in ๅˆฐ d .

ไฝ†ๆ˜ฏไปฃ็ ไผš็›ดๆŽฅๅ‡บ้”™, ๅ› ไธบ B ๅ’Œ C ้ƒฝๅœจ่ฟ™ไธ€ๆญฅ ้œ€ๆ”นไบ†ๅŒไธ€ไธช StateKey . ๅฟ…้กปไฝฟ็”จ Reducer

ๆŠฅ้”™ไฟกๆฏๅฆ‚ไธ‹:

Adding I'm A to [] Adding I'm B to ["I'm A"] Adding I'm C to ["I'm A"] An error occurred: At key 'state': Can receive only one value per step. Use an Annotated key to handle multiple values
class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    state: Annotated[list, operator.add]

3)-ไธ‹้ข็š„ไพ‹ๅญไธญ 2ๆกๅนถ่กŒ๏ผŒ ไฝ†ๆ˜ฏไธ€ๆก่ทฏๅพ„ๅนถๅฆไธ€ๆกๆ›ดๅคš

builder = StateGraph(State)
 
# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
 
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
 
display(Image(graph.get_graph().draw_mermaid_png()))

่พ“ๅ‡บๅฆ‚ไธ‹:

Adding I'm A to [] 
Adding I'm B to ["I'm A"] 
Adding I'm C to ["I'm A"] 
Adding I'm B2 to ["I'm A", "I'm B", "I'm C"] 
Adding I'm D to ["I'm A", "I'm B", "I'm C", "I'm B2"]
  • ๅ‘็Žฐๅœจ B2 ๆ‰ง่กŒ็š„ๆ—ถๅ€™๏ผŒๅทฒ็ปๆ‰ง่กŒไบ† b ๅ’Œ c

4)-ไธ‹้ข็š„ไพ‹ๅญไธญ่ฟ›ไธ€ๆญฅ็”จ sort_reducer ๆŽงๅˆถไบ† merge ็š„้กบๅบ

def sorting_reducer(left, right):
    """ Combines and sorts the values in a list"""
    if not isinstance(left, list):
        left = [left]
 
    if not isinstance(right, list):
        right = [right]
    
    return sorted(left + right, reverse=False)
 
class State(TypedDict):
    # sorting_reducer will sort the values in state
    state: Annotated[list, sorting_reducer]
 
# Add nodes
builder = StateGraph(State)
 
# Initialize each node with node_secret 
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
 
# Flow
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)
graph = builder.compile()
 
display(Image(graph.get_graph().draw_mermaid_png()))
  • ่ฟ™้‡Œไผš็›ดๆŽฅๅฏนๆ‰€ๆœ‰็š„็Šถๆ€ๅ€ผ่ฟ›่กŒๅ…จๅฑ€็š„ๆŽ’ๅบ

5)-ไธ‹้ข็š„ไพ‹ๅญไธญ็ป™ไบ† sink ่Š‚็‚น็š„ๆ–นๆณ•๏ผŒ็ป™ๅ…จๅฑ€่ฟ›่กŒๆŽ’ๅบ

class State(TypedDict):
    main_output: list  # ไธป่ฆ่พ“ๅ‡บๅญ—ๆฎต
    temp_b: Optional[str]  # B่Š‚็‚น็š„ไธดๆ—ถ่พ“ๅ‡บ
    temp_c: Optional[str]  # C่Š‚็‚น็š„ไธดๆ—ถ่พ“ๅ‡บ
    temp_b2: Optional[str]  # B2่Š‚็‚น็š„ไธดๆ—ถ่พ“ๅ‡บ
 
def node_b(state):
    # B่Š‚็‚น็š„ๅค„็†้€ป่พ‘
    return {"temp_b": "I'm B"}
 
def node_c(state):
    # C่Š‚็‚น็š„ๅค„็†้€ป่พ‘
    return {"temp_c": "I'm C"}
 
def node_b2(state):
    # B2่Š‚็‚น็š„ๅค„็†้€ป่พ‘
    return {"temp_b2": "I'm B2"}
 
def sink_node(state):
    # ๆŒ‰็…งๆˆ‘ไปฌๆŒ‡ๅฎš็š„้กบๅบๆ”ถ้›†ไธดๆ—ถๅญ—ๆฎต
    collected_values = []
    
    # ๆŒ‡ๅฎšๆ”ถ้›†้กบๅบ
    if state.get("temp_b") is not None:
        collected_values.append(state["temp_b"])
    
    if state.get("temp_b2") is not None:
        collected_values.append(state["temp_b2"])
        
    if state.get("temp_c") is not None:
        collected_values.append(state["temp_c"])
    
    # ๅฐ†ๆ”ถ้›†็š„ๅ€ผๆทปๅŠ ๅˆฐไธป่พ“ๅ‡บๅญ—ๆฎต
    return {
        "main_output": state.get("main_output", []) + collected_values,
        # ๆธ…้™คไธดๆ—ถๅญ—ๆฎต
        "temp_b": None,
        "temp_c": None,
        "temp_b2": None
    }
  • ไฝฟ็”จไบ†ไธดๆ—ถๅญ—ๆฎต๏ผŒ็„ถๅŽ็ปŸไธ€ๅค„็†

6)-ๅฎž้™…ไธญ็š„็ฑปไพ‹ๅญ๏ผŒ ็ปดๅŸบ็™พ็ง‘ๆœ็ดข + web ๆœ็ดข

from langchain_core.messages import HumanMessage, SystemMessage
 
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.tools import TavilySearchResults
 
def search_web(state):
    
    """ Retrieve docs from web search """
 
    # Search
    tavily_search = TavilySearchResults(max_results=3)
    search_docs = tavily_search.invoke(state['question'])
 
     # Format
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document href="{doc["url"]}">\n{doc["content"]}\n</Document>'
            for doc in search_docs
        ]
    )
 
    return {"context": [formatted_search_docs]} 
 
def search_wikipedia(state):
    
    """ Retrieve docs from wikipedia """
 
    # Search
    search_docs = WikipediaLoader(query=state['question'], 
                                  load_max_docs=2).load()
 
     # Format
    formatted_search_docs = "\n\n---\n\n".join(
        [
            f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}">\n{doc.page_content}\n</Document>'
            for doc in search_docs
        ]
    )
 
    return {"context": [formatted_search_docs]} 
 
def generate_answer(state):
    
    """ Node to answer a question """
 
    # Get state
    context = state["context"]
    question = state["question"]
 
    # Template
    answer_template = """Answer the question {question} using this context: {context}"""
    answer_instructions = answer_template.format(question=question, 
                                                       context=context)    
    
    # Answer
    answer = llm.invoke([SystemMessage(content=answer_instructions)]+[HumanMessage(content=f"Answer the question.")])
      
    # Append it to state
    return {"answer": answer}
 
# Add nodes
builder = StateGraph(State)
 
# Initialize each node with node_secret 
builder.add_node("search_web",search_web)
builder.add_node("search_wikipedia", search_wikipedia)
builder.add_node("generate_answer", generate_answer)
 
# Flow
builder.add_edge(START, "search_wikipedia")
builder.add_edge(START, "search_web")
builder.add_edge("search_wikipedia", "generate_answer")
builder.add_edge("search_web", "generate_answer")
builder.add_edge("generate_answer", END)
graph = builder.compile()
 
display(Image(graph.get_graph().draw_mermaid_png()))

2-Sub graphs

ไพ‹ๅญๆ่ฟฐ:

  1. ไธ€ไธช็ณป็ปŸๅœจๆŽฅๆ”ถๆ—ฅๅฟ— ;
  2. ็„ถๅŽๆ‰ง่กŒ2ไธช ็‹ฌ็ซ‹็š„ sub-tasks: ไธ€ไธช่ดŸ่ดฃๆ€ป็ป“ logs, ๅฆไธ€ไธช่ดŸ่ดฃๆŸฅ่ฏข failure modes ;
  3. ็„ถๅŽๅธŒๆœ›็”จ2ไธช sub-graph ๆ‰ง่กŒ2ไธช็‹ฌ็ซ‹็š„ไปปๅŠก

่ฟ™้‡Œไธป่ฆๆ˜ฏไธบไบ†็†่งฃ graphs ไน‹้—ดๅฆ‚ไฝ•่ฟ›่กŒ้€šไฟก. ๅˆฉ็”จไบ† over-lapping keys ่ฟ™็งๆŠ€ๆœฏ.

  1. ๅญๅ›พๅฏไปฅ่ฎฟ้—ฎ ็ˆถๅ›พไธญ็š„ docs
  2. ็ˆถๅ›พๅฏไปฅ่ฎฟ้—ฎ ๅญๅ›พ็š„ summary ๅ’Œ failure_report

1)-Log ็š„็ป“ๆž„ๅฆ‚ไธ‹

from operator import add
from typing_extensions import TypedDict
from typing import List, Optional, Annotated
 
# The structure of the logs
class Log(TypedDict):
    id: str
    question: str
    docs: Optional[List]
    answer: str
    grade: Optional[int]
    grader: Optional[str]
    feedback: Optional[str]

2)-ๆ•…้šœๅˆ†ๆžๅญๅ›พ ่ฎพ่ฎก

from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END
 
# Failure Analysis Sub-graph
class FailureAnalysisState(TypedDict):
    cleaned_logs: List[Log]
    failures: List[Log]
    fa_summary: str
    processed_logs: List[str]
 
class FailureAnalysisOutputState(TypedDict):
    fa_summary: str
    processed_logs: List[str]
 
def get_failures(state):
    """ Get logs that contain a failure """
    cleaned_logs = state["cleaned_logs"]
    failures = [log for log in cleaned_logs if "grade" in log]
    return {"failures": failures}
 
def generate_summary(state):
    """ Generate summary of failures """
    failures = state["failures"]
    # Add fxn: fa_summary = summarize(failures)
    fa_summary = "Poor quality retrieval of Chroma documentation."
    return {"fa_summary": fa_summary, "processed_logs": [f"failure-analysis-on-log-{failure['id']}" for failure in failures]}
 
fa_builder = StateGraph(FailureAnalysisState,output=FailureAnalysisOutputState)
fa_builder.add_node("get_failures", get_failures)
fa_builder.add_node("generate_summary", generate_summary)
fa_builder.add_edge(START, "get_failures")
fa_builder.add_edge("get_failures", "generate_summary")
fa_builder.add_edge("generate_summary", END)
 
graph = fa_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

่ฟ™ไธชๅญๅ›พๅŒ…ๅซไธคไธช่Š‚็‚น๏ผš

  1. get_failures: ไปŽๆธ…ๆด—่ฟ‡็š„ๆ—ฅๅฟ—ไธญ่Žทๅ–ๅŒ…ๅซๅคฑ่ดฅ็š„ๆ—ฅๅฟ—
  2. generate_summary: ็”Ÿๆˆๅคฑ่ดฅๅˆ†ๆž็š„ๆ‘˜่ฆ

ๅญๅ›พ็š„ๆต็จ‹ๆ˜ฏ๏ผšSTARTย โ†’ get_failures โ†’ generate_summary โ†’ END

3)-้—ฎ้ข˜ๆ‘˜่ฆๅญๅ›พ(Question Summarization Sub-graph)

# Summarization subgraph
class QuestionSummarizationState(TypedDict):
    cleaned_logs: List[Log]
    qs_summary: str
    report: str
    processed_logs: List[str]
 
class QuestionSummarizationOutputState(TypedDict):
    report: str
    processed_logs: List[str]
 
def generate_summary(state):
    cleaned_logs = state["cleaned_logs"]
    # Add fxn: summary = summarize(generate_summary)
    summary = "Questions focused on usage of ChatOllama and Chroma vector store."
    return {"qs_summary": summary, "processed_logs": [f"summary-on-log-{log['id']}" for log in cleaned_logs]}
 
def send_to_slack(state):
    qs_summary = state["qs_summary"]
    # Add fxn: report = report_generation(qs_summary)
    report = "foo bar baz"
    return {"report": report}
 
qs_builder = StateGraph(QuestionSummarizationState,output=QuestionSummarizationOutputState)
qs_builder.add_node("generate_summary", generate_summary)
qs_builder.add_node("send_to_slack", send_to_slack)
qs_builder.add_edge(START, "generate_summary")
qs_builder.add_edge("generate_summary", "send_to_slack")
qs_builder.add_edge("send_to_slack", END)
 
graph = qs_builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

่ฟ™ไธชๅญๅ›พๅŒ…ๅซไธคไธช่Š‚็‚น๏ผš

  1. generate_summary: ็”Ÿๆˆ้—ฎ้ข˜ๆ‘˜่ฆ
  2. send_to_slack: ๅฐ†ๆ‘˜่ฆๅ‘้€ๅˆฐSlack

ๅญๅ›พ็š„ๆต็จ‹ๆ˜ฏ๏ผšSTARTย โ†’ generate_summary โ†’ send_to_slackย โ†’ END

4)-ๅ…ฅๅฃๅ›พ่Š‚็‚น่ฎพ่ฎก: Entry Graph

class EntryGraphState(TypedDict):
    raw_logs: List[Log]
    cleaned_logs: List[Log]
    fa_summary: str  # ๅชๅœจๆ•…้šœๅˆ†ๆžๅญๅ›พไธญ็”Ÿๆˆ
    report: str  # ๅชๅœจ้—ฎ้ข˜ๆ‘˜่ฆๅญๅ›พไธญ็”Ÿๆˆ
    processed_logs: Annotated[List[int], add]  # ๅœจไธคไธชๅญๅ›พไธญ้ƒฝไผš็”Ÿๆˆ

3-Map-Reduce

4-Research Assistant