跳转至

LangChain与LangGraph

概述

LangChain是最早也是最成熟的LLM应用开发框架,而LangGraph是其面向有状态、多步骤工作流的扩展。两者共同构成了当前最完整的Agent开发生态。

LangChain核心概念

架构层次

graph TD
    subgraph LangChain生态
        A[LangChain Core] --> B[LangChain Community]
        A --> C[LangGraph]
        A --> D[LangServe]
        A --> E[LangSmith]
    end

    A --> A1[Models]
    A --> A2[Prompts]
    A --> A3[Output Parsers]
    A --> A4[Chains / LCEL]
    A --> A5[Tools]
    A --> A6[Retrievers]

模型调用

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

# 统一的模型接口
openai_llm = ChatOpenAI(model="gpt-4o", temperature=0)
claude_llm = ChatAnthropic(model="claude-sonnet-4-20250514", temperature=0)

# 切换模型只需改一行
response = openai_llm.invoke("What is AI?")

Prompt模板

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

# 系统+用户模板
prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant specialized in {domain}."),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}"),
])

# 使用
chain = prompt | llm
response = chain.invoke({
    "domain": "machine learning",
    "history": [],
    "input": "Explain gradient descent"
})

LCEL (LangChain Expression Language)

LCEL是LangChain的声明式编排语法,使用管道操作符 | 组合组件:

from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda

# 简单链
chain = prompt | llm | StrOutputParser()

# 带检索的RAG链
rag_chain = (
    {
        "context": retriever | format_docs,
        "question": RunnablePassthrough()
    }
    | prompt
    | llm
    | StrOutputParser()
)

# 并行执行
from langchain_core.runnables import RunnableParallel

parallel_chain = RunnableParallel(
    summary=summary_chain,
    translation=translation_chain,
    keywords=keyword_chain,
)

Output Parsers

from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field

class AnalysisResult(BaseModel):
    sentiment: str = Field(description="positive, negative, or neutral")
    confidence: float = Field(description="confidence score 0-1")
    key_topics: list[str] = Field(description="main topics discussed")

parser = JsonOutputParser(pydantic_object=AnalysisResult)

chain = prompt | llm | parser
result = chain.invoke({"text": "I love this product!"})
# result = AnalysisResult(sentiment="positive", confidence=0.95, ...)

工具定义

from langchain_core.tools import tool

@tool
def search_web(query: str) -> str:
    """Search the web for current information."""
    # 实现搜索逻辑
    return search_engine.search(query)

@tool
def calculate(expression: str) -> float:
    """Evaluate a mathematical expression."""
    return eval(expression)  # 生产环境应使用安全的数学解析器

# 绑定工具到模型
llm_with_tools = llm.bind_tools([search_web, calculate])

LangGraph: 有状态工作流

LangGraph将Agent工作流建模为有向图,提供精确的流程控制和状态管理。

核心概念

概念 说明 类比
State 工作流的全局状态 共享内存
Node 处理节点(函数) 流程中的一步
Edge 节点间的连接 流转条件
Conditional Edge 条件分支 if-else
Checkpoint 状态快照 保存点

状态定义

from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    """Agent工作流的状态定义"""
    messages: Annotated[Sequence[BaseMessage], operator.add]
    current_step: str
    iteration_count: int
    final_answer: str | None

构建Graph

from langgraph.graph import StateGraph, END

def call_model(state: AgentState) -> AgentState:
    """调用LLM"""
    messages = state["messages"]
    response = llm_with_tools.invoke(messages)
    return {"messages": [response]}

def call_tools(state: AgentState) -> AgentState:
    """执行工具调用"""
    last_message = state["messages"][-1]
    tool_results = []
    for tool_call in last_message.tool_calls:
        result = tool_map[tool_call["name"]].invoke(tool_call["args"])
        tool_results.append(ToolMessage(
            content=str(result),
            tool_call_id=tool_call["id"]
        ))
    return {"messages": tool_results}

def should_continue(state: AgentState) -> str:
    """决定是否继续"""
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return "end"

# 构建图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tools)

# 添加边
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {"tools": "tools", "end": END}
)
workflow.add_edge("tools", "agent")

# 编译
app = workflow.compile()

可视化

graph TD
    Start((开始)) --> Agent[Agent Node<br/>调用LLM]
    Agent -->|有工具调用| Tools[Tools Node<br/>执行工具]
    Agent -->|无工具调用| End((结束))
    Tools --> Agent

Checkpointing(状态持久化)

from langgraph.checkpoint.sqlite import SqliteSaver

# 使用SQLite持久化状态
memory = SqliteSaver.from_conn_string(":memory:")
app = workflow.compile(checkpointer=memory)

# 每次调用自动保存状态
config = {"configurable": {"thread_id": "user_123"}}
result = app.invoke(
    {"messages": [HumanMessage(content="Hello")]},
    config=config
)

# 可以恢复到任何历史状态
states = list(app.get_state_history(config))

人机协作 (Human-in-the-Loop)

from langgraph.graph import StateGraph, END

def human_review(state: AgentState) -> AgentState:
    """人工审核节点"""
    # 在实际应用中,这里会暂停等待人工输入
    pass

workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", call_tools)
workflow.add_node("human_review", human_review)

# 高风险操作需要人工审核
workflow.add_conditional_edges(
    "agent",
    route_by_risk,
    {
        "low_risk": "tools",
        "high_risk": "human_review",
        "done": END
    }
)

# 编译时启用中断
app = workflow.compile(
    checkpointer=memory,
    interrupt_before=["human_review"]  # 在human_review前中断
)

子图 (Subgraph)

# 定义子图:研究工作流
research_workflow = StateGraph(ResearchState)
research_workflow.add_node("search", search_node)
research_workflow.add_node("analyze", analyze_node)
research_workflow.add_node("summarize", summarize_node)
# ... 配置边

# 在主图中使用子图
main_workflow = StateGraph(MainState)
main_workflow.add_node("research", research_workflow.compile())
main_workflow.add_node("write", write_node)
main_workflow.add_node("review", review_node)

实际案例: 研究助手

from langgraph.graph import StateGraph, END
from langchain_core.messages import HumanMessage, AIMessage

class ResearchState(TypedDict):
    messages: Annotated[list, operator.add]
    research_topic: str
    search_results: list[str]
    draft: str
    feedback: str
    revision_count: int

def plan_research(state):
    """规划研究方向"""
    prompt = f"Plan a research outline for: {state['research_topic']}"
    response = llm.invoke(prompt)
    return {"messages": [AIMessage(content=response.content)]}

def search_sources(state):
    """搜索相关资料"""
    results = web_search.invoke(state["research_topic"])
    return {"search_results": results}

def write_draft(state):
    """撰写初稿"""
    context = "\n".join(state["search_results"])
    prompt = f"Write a report on {state['research_topic']}.\nSources:\n{context}"
    draft = llm.invoke(prompt)
    return {"draft": draft.content}

def review_draft(state):
    """审核初稿"""
    prompt = f"Review this draft and provide feedback:\n{state['draft']}"
    feedback = llm.invoke(prompt)
    return {"feedback": feedback.content, "revision_count": state["revision_count"] + 1}

def should_revise(state):
    """是否需要修改"""
    if state["revision_count"] >= 3:
        return "finalize"
    if "APPROVED" in state["feedback"]:
        return "finalize"
    return "revise"

# 构建图
graph = StateGraph(ResearchState)
graph.add_node("plan", plan_research)
graph.add_node("search", search_sources)
graph.add_node("write", write_draft)
graph.add_node("review", review_draft)

graph.set_entry_point("plan")
graph.add_edge("plan", "search")
graph.add_edge("search", "write")
graph.add_edge("write", "review")
graph.add_conditional_edges("review", should_revise, {
    "revise": "write",
    "finalize": END
})

app = graph.compile()

LangSmith: 可观测性

LangSmith提供LangChain应用的追踪和监控:

  • Trace可视化: 查看每步的输入、输出、延迟
  • 评估: 自动化测试和评估
  • 数据集管理: 构建和管理测试数据集
  • 反馈收集: 收集用户反馈

最佳实践

1. 模型选择

# 不同任务使用不同模型
cheap_llm = ChatOpenAI(model="gpt-4o-mini")    # 简单任务
powerful_llm = ChatOpenAI(model="gpt-4o")       # 复杂推理
fast_llm = ChatOpenAI(model="gpt-4o-mini")      # 低延迟场景

2. 错误处理

from langchain_core.runnables import RunnableConfig
from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
def robust_chain_invoke(chain, input_data):
    try:
        return chain.invoke(input_data)
    except Exception as e:
        logger.error(f"Chain failed: {e}")
        raise

3. 流式输出

# 流式输出
async for chunk in app.astream(
    {"messages": [HumanMessage(content="Tell me about AI")]},
    config=config
):
    for node_name, output in chunk.items():
        print(f"[{node_name}]: {output}")

总结

选择 场景
LangChain Core 简单的LLM应用、RAG管道
LangGraph 有状态工作流、复杂Agent、人机协作
LangSmith 需要可观测性和评估的生产环境
原始API 极简场景或需要最大控制

LangChain生态的最大优势是完整性——从原型到生产,从单Agent到多Agent,从开发到监控,提供了全栈解决方案。


评论 #