标签 有状态工作流 下的文章

一、概述

LangGraph是LangChain团队开发的低级别编排框架,专为构建、管理和部署长时间运行的有状态AI代理设计,提供持久化执行、灵活控制流和全面内存管理功能,支持循环和条件分支,是开发复杂AI工作流的理想选择。

1.1 核心特点

  • 持久执行:自动保存执行状态,支持故障恢复和断点续跑
  • 循环与分支:突破传统DAG限制,支持复杂的条件判断和循环逻辑
  • 全面内存:集成短期工作内存和长期持久内存,支持跨会话状态保留
  • 人机协作:内置中断机制,允许人工介入审批或修改代理行为
  • 流支持:实时输出执行结果,包括LLM的token级流式响应
  • 可观察性:无缝集成LangSmith,提供完整的执行轨迹和状态转换可视化

二、核心概念

2.1 状态(State)

共享内存,所有节点都可读写的全局数据结构,是代理的"工作记忆"。

  • 定义为Python的TypedDict或dataclass,包含代理需要的所有信息
  • 存储原始数据而非格式化文本,确保不同节点可灵活使用
  • 示例:

    from typing import TypedDict
    class AgentState(TypedDict):
        messages: list  # 对话消息列表
        search_results: list  # 搜索结果
        user_preferences: dict  # 用户偏好

2.2 节点(Nodes)

图的基本执行单元,是接收状态并返回更新的函数。

  • 类型:

    • LLM节点:调用语言模型进行文本理解或生成
    • 工具节点:执行外部API调用、数据库查询等
    • 数据处理节点:转换或分析数据
    • 人工介入节点:暂停执行等待用户输入
  • 定义示例:

    def greet(state: AgentState) -> dict:
        return {"greeting": f"Hello, {state['user_name']}!"}

2.3 边(Edges)

节点间的连接,定义执行流的路径。

  • 普通边:始终执行固定路径
  • 条件边:根据状态决定下一步执行节点
  • 定义示例:

    # 普通边:从"start"到"greet"
    graph.add_edge("start", "greet")
    
    # 条件边:根据状态判断是执行"search"还是"reply"
    def decide_next(state: AgentState) -> str:
        return "search" if state["needs_info"] else "reply"
    graph.add_conditional_edges("greet", decide_next)

三、架构与工作原理

3.1 图结构

LangGraph使用有向图模型表示代理工作流,包含:

  • 特殊节点

    • START:执行入口点
    • END:执行结束点
  • 执行模型:基于"消息传递"的迭代执行,以离散"超步骤"(super-step)推进

3.2 状态管理

  • 短期内存:线程范围内存,随执行结束自动清除
  • 长期内存

    • 存储于独立的Store系统,支持跨会话、跨线程访问
    • 使用namespacekey组织数据,类似文件系统的目录和文件名
    • 支持多种存储后端:内存(开发)、PostgreSQL(生产)、Redis等

3.3 执行流程

  1. 初始化状态并设置入口节点
  2. 执行入口节点,更新状态
  3. 根据边的类型(普通/条件)决定下一节点
  4. 重复直到到达END或达到递归限制(默认25步)
  5. 执行过程中自动保存检查点,支持故障恢复

四、存储方案

LangGraph支持多种存储后端,满足不同场景需求:

存储类型适用场景特点配置示例
InMemoryStore开发测试速度快,无持久化store = InMemoryStore()
PostgresStore生产环境高可靠,支持事务store = PostgresStore("postgresql://user:pass@host/db")
RedisStore分布式系统高性能读写,适合缓存store = RedisStore("redis://host:port")
SQLiteStore轻量级应用文件存储,无需服务器store = SQLiteStore("langgraph.db")

长期记忆配置

from langgraph.store.postgres import PostgresStore
from langgraph.backends import CompositeBackend, StateBackend, StoreBackend

# 配置复合存储:/memories/路径下的数据持久化,其他临时存储
def make_backend(runtime):
    return CompositeBackend(
        default=StateBackend(runtime),  # 临时存储
        routes={"/memories/": StoreBackend(runtime, PostgresStore("..."))}  # 持久存储
    )

五、使用方法

5.1 安装

pip install -U langgraph  # Python版本
npm install @langchain/langgraph  # JavaScript版本

5.2 基本使用步骤

1. 定义状态

from typing import TypedDict
class ChatState(TypedDict):
    messages: list  # 对话消息列表

2. 构建图

from langgraph.graph import StateGraph, START, END
from langchain.llms import OpenAI

# 初始化图
graph = StateGraph(ChatState)

# 定义节点:调用LLM生成回复
def call_llm(state: ChatState):
    llm = OpenAI(temperature=0)
    response = llm.invoke(state["messages"])
    return {"messages": state["messages"] + [response]}

# 添加节点和边
graph.add_node("generate_response", call_llm)
graph.add_edge(START, "generate_response")
graph.add_edge("generate_response", END)

3. 编译并执行

# 编译为可执行应用
app = graph.compile()

# 执行
initial_state = {"messages": [{"role": "user", "content": "Hello!"}]}
final_state = app.invoke(initial_state)
print(final_state["messages"][-1]["content"])  # 输出AI回复

5.3 条件执行与循环

# 定义条件函数:检查是否需要调用工具
def needs_tool(state: ChatState) -> Literal["use_tool", "reply"]:
    last_message = state["messages"][-1]
    return "use_tool" if last_message.get("tool_calls") else "reply"

# 添加条件边
graph.add_conditional_edges("generate_response", needs_tool)

# 添加工具节点和循环边
graph.add_node("use_tool", tool_node)
graph.add_edge("use_tool", "generate_response")  # 循环回LLM节点

六、API参考

6.1 Graph API

核心类

  • StateGraph:构建状态驱动的图,需传入状态类型
  • MessageState:预定义的消息状态,适合聊天应用
  • Checkpointer:管理执行状态的保存和恢复

关键方法

  • add_node(name, function, **kwargs):添加节点,支持重试策略等配置
  • add_edge(from_node, to_node):添加普通边
  • add_conditional_edges(from_node, condition_func):添加条件边
  • compile(checkpointer=None):编译图为可执行应用,支持持久化配置
  • invoke(input_state, config=None):执行图,返回最终状态

6.2 Functional API (简化版)

提供更简洁的方式构建小型工作流:

from langgraph import entrypoint, task

@entrypoint
def my_agent():
    state = {"counter": 0}
    while state["counter"] < 3:
        state = task(increment)(state)  # 调用任务函数
    return state

@task
def increment(state):
    state["counter"] += 1
    return state

result = my_agent()  # 执行

七、开发指南

7.1 构建步骤

  1. 设计工作流:将问题分解为离散步骤,确定节点间依赖关系
  2. 定义状态:确定需要在步骤间共享的数据
  3. 实现节点:为每个步骤编写函数,处理输入状态并返回更新
  4. 连接节点:使用边定义执行顺序,添加必要的条件判断
  5. 添加内存:配置检查点和持久化,实现长期记忆
  6. 测试与调试:使用LangSmith可视化执行过程,检查状态转换

7.2 最佳实践

状态设计

  • 只存储必要信息,避免冗余
  • 保持状态原始,在节点内格式化输出
  • 使用描述性键名,提高可读性

节点设计

  • 单一职责:每个节点专注做一件事
  • 错误处理:为不同错误类型设置适当的处理策略(重试/回退/人工介入)
  • 外部调用:将API调用、数据库操作等封装为独立节点,便于添加重试和监控

内存管理

  • 短期数据存于状态,长期数据使用专用存储
  • 定期清理过时数据,优化存储性能
  • 使用命名空间组织长期数据,便于管理和查询

八、调试与监控

8.1 使用LangSmith集成

LangGraph无缝集成LangSmith,提供全面的可观察性:

# 启用LangSmith追踪
import os
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "..."

# 编译图时启用追踪
app = graph.compile(checkpointer=checkpointer, trace=True)

监控功能

  • 执行轨迹可视化:查看完整执行路径和状态变化
  • 性能分析:测量各节点执行时间,识别瓶颈
  • 异常检测:自动标记执行错误和异常路径
  • 交互式调试:在LangSmith Studio中检查中间状态

8.2 本地调试技巧

  • 断点打印:在节点函数中添加print语句,输出关键状态
  • 分步执行:使用graph.invoke并传入小输入,逐步验证每个节点
  • 错误处理:为节点添加详细的异常捕获和日志记录:

    def safe_node(state):
        try:
            # 正常逻辑
        except Exception as e:
            return {"error": str(e)}  # 返回错误信息而非崩溃

九、部署方案

9.1 自托管部署

使用Docker

# 安装CLI
pip install -U langgraph-cli

# 构建镜像
langgraph build --name my-agent .

# 运行
docker run -p 8124:8124 my-agent

生产配置建议

  • 使用PostgreSQL作为存储后端,确保数据持久化
  • 配置数据加密,保护敏感信息
  • 设置适当的资源限制,防止滥用
  • 使用负载均衡和水平扩展,提高吞吐量

9.2 LangSmith Cloud (原LangGraph Platform)

提供一键式云部署:

  • Lite版本:免费使用,每年限制100万节点执行
  • Enterprise版本:全功能支持,适合大规模生产环境

优势

  • 自动扩展和高可用性
  • 内置监控和告警系统
  • 开箱即用的安全与合规功能
  • 与LangSmith无缝集成,提供完整的可观察性

十、完整示例:构建天气查询代理

# 1. 安装依赖
pip install langgraph langchain openai

# 2. 导入必要模块
from typing import TypedDict, Literal
from langchain.llms import OpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver  # 内存检查点

# 3. 定义状态
class WeatherAgentState(TypedDict):
    messages: list  # 对话消息
    location: str  # 查询的城市
    weather_info: str  # 天气信息

# 4. 定义工具函数
def get_weather(location: str) -> str:
    """简化的天气查询API"""
    if location.lower() == "sf":
        return "60°F, foggy"
    elif location.lower() == "ny":
        return "90°F, sunny"
    else:
        return "Weather data not available for this location"

# 5. 定义节点
def initial_prompt(state: WeatherAgentState) -> dict:
    """询问用户想查询哪个城市的天气"""
    llm = OpenAI(temperature=0)
    response = llm.invoke([HumanMessage(content="Which city's weather would you like to check?")])
    return {"messages": [response]}

def parse_location(state: WeatherAgentState) -> dict:
    """从用户消息中提取城市名"""
    last_message = state["messages"][-1]
    location = last_message.content.strip().lower()
    return {"location": location, "messages": state["messages"] + [AIMessage(content=f"Checking weather for {location}...")]}

def get_weather_info(state: WeatherAgentState) -> dict:
    """调用天气工具获取信息"""
    weather = get_weather(state["location"])
    return {"weather_info": weather, "messages": state["messages"] + [AIMessage(content=f"Weather in {state['location']}: {weather}")]}

# 6. 构建图
graph = StateGraph(WeatherAgentState)

# 添加节点
graph.add_node("initial_prompt", initial_prompt)
graph.add_node("parse_location", parse_location)
graph.add_node("get_weather_info", get_weather_info)

# 添加边定义执行流
graph.add_edge(START, "initial_prompt")
graph.add_edge("initial_prompt", "parse_location")
graph.add_edge("parse_location", "get_weather_info")
graph.add_edge("get_weather_info", END)

# 7. 添加内存支持
checkpointer = MemorySaver()  # 使用内存检查点保存状态
app = graph.compile(checkpointer=checkpointer)

# 8. 执行代理
first_run = app.invoke({})
print("First run output:")
for msg in first_run["messages"]:
    print(f"{msg['role'].capitalize()}: {msg['content']}")

print("\nSecond run (with state persistence):")
# 第二次执行会保留之前的对话状态
second_run = app.invoke({})
for msg in second_run["messages"]:
    print(f"{msg['role'].capitalize()}: {msg['content']}")

十一、总结

LangGraph是构建复杂AI代理的强大框架,通过状态驱动的图结构,提供了持久执行、灵活控制流和全面内存管理能力。使用LangGraph,开发者可以轻松构建具有记忆、能够处理复杂逻辑的AI代理,适用于客服、研究助手、自动化工作流等多种场景。