标签 LangGraph 下的文章

在 AI 技术日新月异的今天,AI Agent(智能体)正逐渐从概念走向落地。它不仅能进行对话,更具备了思考、规划和执行任务的能力。然而,构建一个成熟的 Agent 系统,并非简单的 API 调用,而是多种核心技术协同工作的结果。

在深入开发之前,理清这些基础概念,有助于我们更好地理解 AI 系统的底层运行逻辑。


一、 智能的内核:大语言模型与交互边界

1. LLM(大语言模型):通识大脑

LLM 是 Agent 的核心引擎。它拥有强大的语言理解能力,但它是一个“静态大脑”,其知识停留在训练截止的那一刻,无法感知企业内部的私有数据。

2. Context Window(上下文窗口):短期记忆

这是模型单次交互能处理的信息上限。

  • 局限: 即使窗口再大,也不能盲目塞入所有数据。正如在数学题中加入无关的干扰信息会降低准确率一样,过长的背景会导致模型“注意力不集中”,甚至产生幻觉。

3. Prompt Engineering(提示工程):沟通的艺术

  • Zero-shot(零样本): 不给示例,直接下指令。这要求指令必须高度具体(如:从“写个政策”优化为“写个 200 字符合 GDPR 标准的隐私政策”)。
  • Few-shot(少样本): 提供几个理想的问答示例,这能有效地规范 AI 输出的语气(Tone)和特定格式。
  • Chain of Thought(思维链): 引导 AI 展示推理步骤,强制模型分配更多计算资源在逻辑推导上,从而处理复杂问题。


二、 知识的扩展:从“翻书”到“记忆”

为了让 AI 访问私有数据,我们需要构建一套“外挂硬盘”。

4. 向量数据库 vs 传统数据库

传统的 SQL 数据库是基于值或关键词的匹配(如 LIKE %vacation%)。而向量数据库(如 ChromaDB, Pinecone)则是基于含义(Meaning)的匹配。即使搜索词不一致,只要语义接近,系统就能精准定位。

5. Embeddings 与数据预处理

  • 数据切分(Chunking): 我们不能将 500GB 的文档直接塞给 AI。必须将其切成小块。
  • 重叠(Overlap): 在切分时,通常会保留一定的文字重叠。这能防止上下文在切分处丢失,从而大幅提升检索的准确性。
  • Embeddings: 将切分好的文本块转化为高维数字向量,让计算机能够以数学方式计算语义的相关性。

6. RAG(检索增强生成):知识的补丁

RAG 是目前解决 AI 幻觉的最优方案。它通过“检索 -> 增强 -> 生成”的流程,让 AI 像是在参加开卷考试:先去数据库里“翻书”找到事实,再根据事实组织答案。


三、 行动的逻辑:框架、编排与协议

7. LangChain:开发的“胶水”层

LangChain 是一个强大的抽象层,旨在简化开发流程。

  • 核心价值: 它像管道一样将模型、提示词模板和向量库连接起来。有了它,你从 OpenAI 切换到 Google Gemini 可能只需要更改一行代码,极大地提高了系统的灵活性。

8. LangGraph:有状态的“总导演”

当任务需要循环和决策时,简单的线性管道就不够用了。

  • 节点与边: LangGraph 通过节点(步骤)和边(路径)构建工作流。
  • 共享状态(State): 这是它的核心。它维护着一个在各节点间传递的“字典”,记录着当前的文档、评分等信息。基于这个状态,系统可以执行复杂逻辑:例如“如果合规分数低于 75 分,则循环回退到搜索节点重新查阅”。

9. MCP(模型上下文协议):标准化的“USB 接口”

这是连接外部工具(如 GitHub、数据库)的通用标准。它让 AI 具备了“即插即用”的能力,开发者无需为每个工具编写特定的硬编码集成,只需符合 MCP 协议,Agent 就能自主调用。


四、 总结:各组件是如何协同工作的?

构建一个完整的 AI 系统,本质上是让这些组件各司其职、形成闭环:

  1. 准备: 文档经过切分与重叠处理,通过 Embeddings 存入向量数据库
  2. 触发: 用户提问,LangChain 调度 RAG 流程,根据语义意图找回知识。
  3. 决策: LangGraph 根据当前状态判断:是直接回答,还是需要循环修正?
  4. 执行: 如果需要实时数据,通过 MCP 协议调用外部工具。
  5. 产出: LLM 结合所有事实与逻辑推理,输出最终方案。

理清了这些基石,你就已经掌握了从“对话机器人”跨越到“全能 Agent”的底层蓝图。

本文由mdnice多平台发布

使用 django 的 command 启动 langGraph Server, 但要求基于 AsyncPostgresSaver,
没有找到相关的可用的代码, 这里记录下 直接抛出代码

"""
Run the LangGraph Agent Server
"""

import asyncio
import uvicorn
from django.core.management.base import BaseCommand
from django.conf import settings
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langserve import add_routes
from langchain_core.globals import set_verbose
from psycopg_pool import AsyncConnectionPool
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from apps.ai.graph.app import AsyncGraphApp


class Command(BaseCommand):
    """
    Run the LangGraph Agent Server
    """

    help = "Starts the LangGraph Agent Server"

    def add_arguments(self, parser):
        parser.add_argument("--host", type=str, default="0.0.0.0")
        parser.add_argument("--port", type=int, default=2028)

    def handle(self, *args, **options):
        asyncio.run(self.handle_async(*args, **options))

    async def handle_async(self, *args, **options):
        """启动 Agent Server"""
        host = options["host"]
        port = options["port"]

        self.stdout.write(f"Starting Agent Server at http://{host}:{port}...")

        # Get the LangGraph application
        checkpointer = await self.get_checkpointer()
        graph_app = AsyncGraphApp().compile(checkpointer=checkpointer).app
        print(f"graph_app----------------->: {graph_app}")

        # Initialize FastAPI app
        app = FastAPI(
            title="Baby Consultant Agent",
            version="1.0",
            description="A LangGraph-based agent for baby consultation",
        )

        # Set CORS
        app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )
        set_verbose(True)
        # Add routes using LangServe
        # This exposes the graph at /agent/invoke, /agent/stream, etc.
        add_routes(
            app,
            graph_app,
            path="/agent",
        )

        # Run with Uvicorn
        config = uvicorn.Config(app, host=host, port=port)
        # 基于当前的Async Running Loop 启动unicorn
        server = uvicorn.Server(config)
        await server.serve()

    async def get_checkpointer(self):
        """获取 Checkpointer"""
        # 1. 显式创建连接池 (让它在应用生命周期内一直存活)
        connection_kwargs = {
            "autocommit": True,
            "prepare_threshold": 0,
        }

        # 使用同步的 ConnectionPool
        pool = AsyncConnectionPool(
            conninfo=settings.LANGGRAPH_POSTGRES_CONNECTION_STRING,
            max_size=20,
            kwargs=connection_kwargs,
        )

        # 2. 将连接池传入构造函数
        checkpointer = AsyncPostgresSaver(pool)

        # 3. 初始化数据库表
        await checkpointer.setup()

        return checkpointer

使用 django 的 command 启动 langGraph Server, 但要求基于 AsyncPostgresSaver,
没有找到相关的可用的代码, 这里记录下 直接抛出代码

"""
Run the LangGraph Agent Server
"""

import asyncio
import uvicorn
from django.core.management.base import BaseCommand
from django.conf import settings
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langserve import add_routes
from langchain_core.globals import set_verbose
from psycopg_pool import AsyncConnectionPool
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from apps.ai.graph.app import AsyncGraphApp


class Command(BaseCommand):
    """
    Run the LangGraph Agent Server
    """

    help = "Starts the LangGraph Agent Server"

    def add_arguments(self, parser):
        parser.add_argument("--host", type=str, default="0.0.0.0")
        parser.add_argument("--port", type=int, default=2028)

    def handle(self, *args, **options):
        asyncio.run(self.handle_async(*args, **options))

    async def handle_async(self, *args, **options):
        """启动 Agent Server"""
        host = options["host"]
        port = options["port"]

        self.stdout.write(f"Starting Agent Server at http://{host}:{port}...")

        # Get the LangGraph application
        checkpointer = await self.get_checkpointer()
        graph_app = AsyncGraphApp().compile(checkpointer=checkpointer).app
        print(f"graph_app----------------->: {graph_app}")

        # Initialize FastAPI app
        app = FastAPI(
            title="Baby Consultant Agent",
            version="1.0",
            description="A LangGraph-based agent for baby consultation",
        )

        # Set CORS
        app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )
        set_verbose(True)
        # Add routes using LangServe
        # This exposes the graph at /agent/invoke, /agent/stream, etc.
        add_routes(
            app,
            graph_app,
            path="/agent",
        )

        # Run with Uvicorn
        config = uvicorn.Config(app, host=host, port=port)
        # 基于当前的Async Running Loop 启动unicorn
        server = uvicorn.Server(config)
        await server.serve()

    async def get_checkpointer(self):
        """获取 Checkpointer"""
        # 1. 显式创建连接池 (让它在应用生命周期内一直存活)
        connection_kwargs = {
            "autocommit": True,
            "prepare_threshold": 0,
        }

        # 使用同步的 ConnectionPool
        pool = AsyncConnectionPool(
            conninfo=settings.LANGGRAPH_POSTGRES_CONNECTION_STRING,
            max_size=20,
            kwargs=connection_kwargs,
        )

        # 2. 将连接池传入构造函数
        checkpointer = AsyncPostgresSaver(pool)

        # 3. 初始化数据库表
        await checkpointer.setup()

        return checkpointer

🚀 快速回答 (Golden Answer)

从 0 到 1 搭建智能体的核心逻辑是 “明确需求 → 选对工具 → 配置闭环 → 测试优化”:无需复杂编程,优先用零代码 / 低代码工具(如 Coze、LangGraph),先定义 “智能体要解决的具体任务”(如自动化办公、设计辅助),再通过 “设定角色 → 拆解任务 → 配置工具 → 添加反思逻辑” 完成搭建,最终通过测试迭代优化效果。核心是 “让智能体精准匹配需求”,而非追求技术复杂度。

一、前置认知:先搞懂 “搭建智能体” 的核心前提

1.1 搭建智能体的核心目标:解决 “具体问题”

智能体的核心价值是 “自主完成复杂任务”,搭建前必须明确 “它要帮你做什么”,避免盲目搭建。常见落地场景:

  • 个人场景:自动化周报生成、文献整理助手、学习笔记总结、购物比价监控;
  • 职场场景:客户咨询智能客服、销售数据自动分析、市场调研报告生成、设计批量出图;
  • 垂直场景:电商运营助手(商品上架 + 文案生成)、科研辅助(数据检索 + 分析)、教育答疑(学科知识点梳理)。

1.2 搭建智能体的核心逻辑:“感知 - 规划 - 行动 - 反思” 闭环

无论用哪种工具,智能体的底层逻辑都是这四个环节的循环,搭建的本质就是 “配置这四个环节的规则”:

  • 感知:让智能体 “接收信息”(如用户需求、外部数据、工具反馈);
  • 规划:让智能体 “拆解任务”(如 “生成销售报告” 拆解为 “收集数据 → 清洗数据 → 分析 → 排版”);
  • 行动:让智能体 “执行步骤”(如调用 Excel、API 接口、设计工具完成具体操作);
  • 反思:让智能体 “修正错误”(如数据缺失时重新收集,格式错误时自动调整)。

1.3 零基础搭建的核心原则:“工具优先,不造轮子”

无需从零开发大模型或底层架构,当前主流工具已提供 “可视化配置 + 现成组件”,零基础只需聚焦 “需求匹配” 和 “流程配置”,核心原则:

  1. 优先选零代码工具(如 Coze、Notion AI Agent),快速验证需求;
  2. 复杂场景再用低代码工具(如 LangGraph、AutoGen),灵活适配个性化需求;
  3. 先搭建 “最小可用版本”(仅满足核心任务),再逐步添加功能。

二、工具选择:零基础必看的 “工具选型矩阵”

不同工具的门槛、功能、适配场景差异较大,结合 “零基础友好度” 和 “落地实用性”,整理核心工具对比:

工具名称技术门槛核心优势适配场景学习成本
Coze(扣子)零代码可视化配置,插件生态丰富(支持 Excel、数据库、设计工具等),可直接发布为小程序 / APP个人助手、职场自动化、客服机器人低(1-2 小时掌握基础配置)
Notion AI Agent零代码与文档深度融合,支持笔记整理、报告生成、任务管理,操作简单直观学习助手、文献整理、文档自动化极低(熟悉 Notion 即可上手)
LangGraph低代码(Python 基础)状态控制极强,支持复杂循环逻辑,适配高定制化任务科研辅助、复杂数据分析、自动化办公流中(需掌握基础 Python 和 Prompt 技巧)
AutoGen低代码(Python 基础)支持多智能体协作,角色分工明确,降低复杂任务的配置难度软件工程、内容生产流水线、多步骤商业分析中(需理解多智能体协同逻辑)
Make(原 Integromat)零代码专注工具集成,支持 1000 + 款软件对接,擅长自动化工作流串联跨平台自动化(如微信 + Excel + 邮件协同)低(重点学习工具对接逻辑)

💡 零基础优先推荐:Coze(功能全、生态完善)或 Notion AI Agent(简单直观);若需处理复杂任务,再学习 LangGraph(低代码门槛)。

三、分步实操:用 Coze 从零搭建 “自动化周报生成智能体”(零代码案例)

以 “自动收集 Excel 数据 → 生成周报 → 排版导出” 为核心任务,用 Coze 完成搭建,全程可视化操作,10 分钟即可完成基础版本:

3.1 第一步:明确需求与角色设定

  1. 核心需求:用户上传 Excel 销售数据后,智能体自动计算核心指标(销售额、增长率、Top3 产品),生成结构化周报,支持 Word 导出;
  2. 角色设定:在 Coze 后台 “角色定义” 中填写 ——“你是职场销售周报生成助手,擅长从 Excel 数据中提取核心信息,生成逻辑清晰、格式规范的周报,语言正式专业”;
  3. 补充提示:添加 “周报格式要求”(如包含 “本周概况、核心数据、趋势分析、下周计划” 模块),让智能体输出更精准。

3.2 第二步:配置 “工具”(让智能体具备执行能力)

智能体需要对接 Excel 和 Word 工具,才能完成数据读取和导出,操作步骤:

  1. 在 Coze “插件市场” 中搜索 “Excel 解析” 和 “Word 导出” 插件,点击 “启用”;
  2. 配置插件权限:授权 Coze 读取用户上传的 Excel 文件(仅读取权限,保障数据安全);
  3. 测试工具连通性:上传一份测试 Excel 数据,点击 “测试插件”,确认智能体能正常提取数据。

3.3 第三步:设计 “任务流程”(拆解执行步骤)

在 Coze “流程设计” 模块,用可视化拖拽配置任务步骤,核心流程:

  1. 触发条件:用户上传 Excel 文件并发送 “生成周报” 指令;
  2. 步骤 1:调用 “Excel 解析” 插件,提取数据(销售额、产品名称、日期等);
  3. 步骤 2:智能体计算核心指标(本周总销售额、环比增长率、Top3 热销产品);
  4. 步骤 3:按照预设格式生成周报文本;
  5. 步骤 4:调用 “Word 导出” 插件,生成周报文件并反馈给用户。

3.4 第四步:添加 “反思逻辑”(让智能体能修正错误)

为避免数据缺失或格式错误,添加简单反思规则:

  1. 在 “流程设计” 中添加 “判断节点”:若 Excel 数据缺失关键字段(如 “销售额”),则自动向用户发送 “请补充包含销售额字段的 Excel 文件”;
  2. 添加 “格式校验”:生成周报到导出前,自动检查是否包含预设的 4 个模块,缺失则补充完善。

3.5 第五步:测试与发布

  1. 测试验证:上传测试 Excel 数据,发送 “生成周报” 指令,查看智能体是否能正确完成全流程,重点检查数据计算准确性和格式规范性;
  2. 优化迭代:若存在格式混乱,补充 “周报格式细则”(如字体、行距、标题层级);若数据计算错误,调整指标计算规则;
  3. 发布使用:测试通过后,点击 “发布”,可生成小程序 / 网页链接,直接在工作中使用。

四、进阶优化:让智能体更 “好用” 的 3 个关键技巧

4.1 精准 Prompt 优化:提升输出质量

在角色定义中补充 “具体约束”,而非模糊描述,示例:

  • 差 Prompt:“生成专业的周报”;
  • 好 Prompt:“生成销售周报,包含本周概况(30 字内)、核心数据(表格呈现)、趋势分析(200 字内)、下周计划(3 条核心动作),语言正式,避免口语化,数据保留 2 位小数”。

4.2 个性化适配:对接个人 / 企业知识库

若智能体需要适配特定业务(如公司产品知识、个人工作习惯),可在 Coze 中上传 “知识库”(如公司产品手册、个人工作模板),让智能体学习后输出更贴合需求的结果。

4.3 多智能体协作:解决复杂任务

对于 “市场调研 → 数据分析 → 报告生成” 这类复杂任务,可搭建 “多智能体团队”:

  • 调研智能体:负责收集市场数据;
  • 分析智能体:负责数据计算与趋势分析;
  • 撰写智能体:负责生成最终报告; 在 Coze 中配置 “智能体间通信规则”,让它们协同完成任务,提升效率。

五、避坑指南:零基础搭建常见问题与解决方案

常见问题核心原因解决方案
智能体输出不符合预期(如格式混乱)角色定义模糊,缺乏明确约束补充具体的输出格式、语言风格、内容模块要求,用示例引导(如 “参考以下示例格式生成:【本周概况】XXX”)
智能体无法完成复杂任务(如数据计算错误)任务拆解不细致,工具配置不当将复杂任务拆分为更细的原子步骤,检查工具参数配置(如数据字段匹配),添加人工校验节点
智能体出现 “幻觉”(如编造数据)缺乏真实数据支撑,规则约束不足强制智能体仅基于用户上传的数据输出,添加 “禁止编造数据” 的规则,关键数据要求标注来源
工具调用失败(如无法读取 Excel)插件权限不足,文件格式不兼容重新授权插件权限,统一文件格式(如 Excel 保存为.xlsx 格式),测试工具连通性

六、FAQ:零基础搭建智能体最关心的核心问题

Q1:搭建智能体需要懂编程吗?

答:不需要。 零代码工具(如 Coze、Notion AI Agent)通过可视化拖拽和文字描述即可完成搭建;若需高定制化,仅需掌握基础 Python(低代码工具),但零基础可先从简单工具入手,无需一开始学习编程。

Q2:搭建智能体需要花钱吗?

答:个人非商业使用基本免费。 Coze、Notion AI Agent 等工具对个人用户提供免费额度(足够日常使用);商业场景或高频率使用可能需要付费升级,但零基础入门无需付费。

Q3:智能体的数据安全有保障吗?

答:选择正规工具即可保障。 主流工具(如 Coze、Notion)均有数据加密机制,且可设置 “仅自己可见”;避免上传敏感数据(如身份证、银行卡信息),若需处理企业数据,可选择企业版工具(提供私有部署选项)。

Q4:搭建完成后,能修改功能吗?

答:可以。 所有主流工具均支持 “二次编辑”,可随时修改角色定义、任务流程、工具配置;建议根据使用反馈定期优化,让智能体更贴合需求。

七、核心总结

从 0 到 1 搭建智能体的核心不是 “技术攻关”,而是 “需求聚焦” 与 “流程拆解”:零基础用户无需畏惧,优先选择零代码工具,先明确 “智能体要解决的具体问题”,再通过 “角色定义 → 工具配置 → 流程设计 → 测试优化” 的步骤逐步落地,先搭建 “最小可用版本” 验证需求,再逐步进阶优化。

智能体的价值在于 “解放重复劳动”,搭建的关键是让它成为 “贴合自己需求的助手”,而非追求 “功能全而杂”。随着工具生态的完善,“人人都能搭建智能体” 已成为趋势,掌握这种 “人机协同” 的搭建能力,将大幅提升个人与工作效率。

参考文献与工具资源

  1. Coze(扣子)官方文档:《零代码智能体搭建指南》
  2. LangGraph 官方教程:《低代码智能体开发实战》
  3. 腾讯云《智能体落地实践白皮书》(2025)
  4. 推荐学习平台:Coze 学院、Notion AI 帮助中心、GitHub AutoGen 示例仓库

核心关键词

智能体搭建、从 0 到 1、零代码智能体、Coze、LangGraph、自动化办公、智能体工具、人机协同

摘要: 随着大模型从“对话时代”迈向“任务执行时代”,智能体工作流(Agentic Workflow)已成为企业级 AI 应用的核心。本文深度拆解 Agent 的感知、规划、记忆与行动闭环,结合 GartnerMcKinsey 的最新权威数据,为开发者提供一套可落地的 AI 智能体架构指南。

🚀 快速回答 (Golden Answer)

智能体工作流 (Agent Workflow) 是将大语言模型(LLM)从静态文本生成工具转化为动态任务执行核心的编排逻辑。其核心在于引入了“感知-决策-行动-观测”的闭环机制。通过思维链(CoT)自我反思(Self-Reflection),Agent 能够自主拆解复杂目标并在动态环境中实现闭环执行。


一、 认知重塑:从大模型到智能体的技术演进

1.1 范式转移:第二代 AI 的兴起

根据 Stanford HAI 定义的演进路径,AI 正在经历从“概率拟合”到“目标达成”的跨越。
Standard_LLM_vs_AI_Agent.png

  • Gartner 趋势预测: 根据 Gartner 发布的《2026 年十大战略技术趋势》“多智能体系统 (MAS)” 被列为年度核心趋势,预测到 2028 年,全球 90% 的 B2B 采购将由 AI 智能体介入。
  • McKinsey 调研数据: McKinsey Digital 2025 年末报告《The state of AI in 2025》显示,全球 88% 的组织已常规使用 AI,且 62% 的受访企业正积极部署 AI 智能体。

二、 核心架构:驱动 Agent 的“四元引擎”

AI_Agent_Four_Engines_Blueprint.png

2.1 规划系统 (Planning)

利用 LLM 的逻辑推理能力,将模糊指令拆解为原子任务。其数学表达为路径的最优概率选择:

2.2 记忆系统 (Memory)

  • 短期记忆: 依赖上下文窗口(Context Window)维护当前任务状态。
  • 长期记忆: 结合向量数据库实现 RAG(检索增强生成)

2.3 执行系统 (Action)

通过 MCP 协议等标准接口,Agent 操作外部软件、调用 API 或执行 Python 脚本,打破“离线”限制。

2.4 反思系统 (Reflection)

通过对比“预期输出”与“实际观测(Observation)”,启动自我修正逻辑:


三、 主流开发框架与选型矩阵

针对不同业务需求,2026 年的主流开发路径如下表所示:

框架名称技术路线核心优势推荐场景
LangGraph有向循环图状态控制极强,支持复杂循环自动编程、复杂审计
Coze (扣子)零代码可视化门槛极低,插件生态丰富个人助理、社交媒体
AutoGen多智能体协作角色分工明确,降低幻觉软件工程、内容流水线

ChatGPT Image 2026年1月20日 11_29_12.jpeg

国内已有团队开始围绕“Agentic Workflow”构建企业级智能体产品线,其中包括专注于智能体工作流的公司——智能体来了,其重点在于将规划、工具调用与工作流调度整合为可交付模块。

这一类公司代表着智能体从研究走向产业化的趋势。


四、 实战视角:构建具备“自愈能力”的原型

以下是基于 Python 的工业级 Agent 逻辑骨架,展示了如何处理执行异常并触发自动重规划(Re-planning)。

"""
# 依赖环境:langchain>=0.3.0, openai>=1.50.0
# 官方参考文档: https://python.langchain.com/
"""
from typing import List, Dict

class LogicAgent:
    def __init__(self, model_name="deepseek-v3"):
        self.model = model_name
        self.history = []

    def run_workflow(self, task_goal: str):
        # 1. 初始规划 (Planning)
        current_plan = self.generate_initial_plan(task_goal)
        
        while not self.is_task_complete(current_plan):
            # 2. 执行原子任务 (Action)
            step = current_plan.get_next_step()
            observation = self.execute_step(step)
            
            # 3. 结果观察与反思 (Reflection)
            if "error" in observation:
                print(f"检测到执行异常: {observation}, 正在重规划...")
                current_plan = self.replan(task_goal, observation)
            else:
                self.history.append(observation)
        
        return self.finalize_output()
工程化优化提示: 在实际生产环境中,建议添加 最大迭代次数(Max_Iterations)超时机制(Timeout),避免 Agent 在 Observation 环节获取模糊反馈时陷入逻辑死循环。

五、 FAQ:AI 智能体落地路径与优化技巧

Q1:如何有效缩短 AI 智能体落地路径?
答: 遵循“从小到大”原则。先在 CozeDify 验证逻辑闭环,确认有效后再迁移至 LangGraph 进行深度定制。

Q2:有哪些核心的 Agent 工作流优化技巧?

  • 引入反思节点: 对每个 Action 结果进行置信度评分。
  • 长短记忆分离: 滑动窗口维护状态,向量索引调用历史。
  • 动态路径切换: 赋予模型根据反馈跳过步骤或回溯的权限。

六、 参考文献与权威索引 (References)

  1. Gartner: Top Strategic Technology Trends for 2026
  2. McKinsey: The state of AI in 2025
  3. Stanford HAI: AI Index Report 2025
  4. LangGraph Docs: State Machine Framework

前言

在过去一年里,我们见证了LLM (大语言模型) 爆发式的增长,LLM的能力有了质的飞跃,也颠覆了所有开发者对“软件能力边界”的认知。只需要几行代码,调用一次LLM api接口,模型就能帮你写一段看起来像模像样的代码、总结一份结构清晰的文档或者回答一些“看起来很聪明”的问题。但当你试图想构建一个稳定、可复用、复杂的生产级别的AI应用时就会遇到

  • Prompt 失控

    一开始只是几行提示词,后来变成了几百行规则说明。你不断往 Prompt 里“打补丁”,但模型依然会在某个边缘场景下给你一个完全不可用的结果。

  • 结果不可预测(Non-deterministic)

    LLM 是概率模型,而业务系统追求的是确定性。

    “大概率对”在 Demo 阶段可以接受,但在审批、风控、数据查询这类场景中,等同于事故隐患。

  • AI应用开发周期长,不能复用

    新做一个 AI 应用,往往要重新写一套流程代码,反复的在从零开始造轮子。

  • 调试困难

    当用户反馈“刚刚还能用,现在不行了”,你却无法复现。

    你不知道当时:

    • Prompt 最终渲染成了什么
    • 模型具体返回了哪一步异常
    • 是模型波动,还是上下游数据变化

这些问题叠加在一起,会把一个原本看起来“很有前途”的 AI 项目,迅速拖入不可维护的深渊。这也是AIWorks平台诞生的初衷,AIWorks不仅仅是一个简单的低代码开发工具,它是一个确定性的编排系统。本文将从工程师的角度带你了解一下AIWorks平台中workflow的设计与实现。

核心设计哲学

我们将Workflow引擎的设计,收敛为四个核心原则

DAG为骨架

复杂的业务逻辑,如果不加整理,往往是一团乱麻的代码(Spaghetti Code)。我们将业务逻辑抽象为数学上的 DAG(有向无环图)

  • Node(节点):代表一个原子的计算单元。它可能是一次 LLM 的推理,也可以是一段 Python 代码的执行,或者是一次 HTTP 请求。
  • Edge(边):代表数据的流向和执行的顺序。

这种设计使得业务逻辑可视化。前端拖拽生成 JSON,后端解析执行 JSON。所见即所得,对非技术人员来说非常的友好。

状态机为灵魂

很多工作流系统,本质上只是任务编排器,节点按顺序执行,执行完就结束。但AI Workflow的行为模式具备以下特征:

  • 多轮交互
  • 上下文强依赖
  • 当前行为取决于“之前发生了什么”

这就要求AI Workflow不是一个简单的线性流程,而是一个状态不断迁移的系统。因此,在 AIWorks 中,我们将工作流视为一个状态机(State Machine),每一次节点执行都会引起 Graph State 的变化,下一步的走向,取决于当前的状态。

一切皆节点

在AIWorks的workflow设计中,节点作为最小执行单元,无论是调用LLM,还是执行一段简单的Python代码,还是调用高德地图API,它们都被抽象成节点。所有节点都继承自同一个基类 BaseNode,Workflow执行引擎不关心节点“做了什么”,只关心节点是否成功,产出了什么结果。这样可以极大的提高系统的扩展能力,如果需要新增一种新能力(比如给飞书发消息),那么你只需要继承BaseNode,然后实现其中对应的方法就能无缝接入到现有的系统中,享受工作流引擎提供的所有能力(重试、 日志、变量注入等)。

可观测性优先

不是“出问题了再加日志”,而是“天生可被回放”。为了让AI Workflow系统不再是“黑盒”,我们将可观测性作为核心设计原则之一。

引擎会自动记录工作流中每一个节点(Node)的完整执行快照,包括:

  • 输入(Inputs):节点接收到的变量和参数。
  • 输出(Outputs):节点运行后的产出结果。
  • 状态变化(Status Changes):从开始、运行中到结束的每一刻。

这种精细粒度的记录,使得开发者可以在工作流执行完成后,像看“即时回放”一样,逐帧查看执行过程。一旦出现问题,通过回溯输入输出,就能迅速定位是哪个环节的 Prompt 写得不对,还是哪个 Tool 调用参数传错了,真正做到“有迹可查”。

Workflow引擎架构解析

整体分层

AIWorks 工作流引擎采用了经典的分层架构:

应用层(Application Layer)
    ↓
图引擎层(Graph Engine Layer)
    ↓
节点层(Node System Layer)
    ↓
基础设施层(Infrastructure Layer)

每一层的职责都很明确:

  • 应用层:提供 API 接口,处理用户请求
  • 图引擎层:负责工作流的编排和执行
  • 节点层:实现各种能力单元(LLM、工具、知识检索等)
  • 基础设施层:提供底层服务(模型调用、向量检索、工具运行时等)

这样分层的好处是关注点分离。比如你要换个 LLM 提供商,只需要改基础设施层;要加个新节点类型,只需要在节点层扩展,不会影响引擎核心逻辑。

Graph Engine:整个系统的“心脏”

GraphEngine是 AIWorks 工作流引擎的核心调度器,它的职责可以概括为三件事:

  • 解析前端传入的工作流JSON
  • 将其编译为可执行的 Graph App
  • 驱动整个执行生命周期(初始化状态、执行Graph、输出结果并保存状态)

1. 静态图构建:业务逻辑的“蓝图”

前端通过拖拽或配置生成的流程,本质上是一份 JSON 描述的 DAG。在 AIWorks中,我们并不会直接将这份 JSON 交给执行引擎,而是定义类Graph对其进行装载和校验,并解析JSON中的节点(Node)和边(Edge)。GraphEngine使用LangGraph作为工作流的执行引擎,GraphEngine将 Graph 对象转换为 LangGraph 提供的 StateGraph,并将Graph中的节点(Node)和边(Edge)动态设置到StateGraph中。

简化后的核心逻辑如下:

# 伪代码示意
class GraphEngine:
        def __init__(self, ..., graph: Graph):
        self._graph = graph
        ...
    def _build_graph_app(self, state_context):
            workflow = StateGraph(GraphRuntimeState)
            node_id_config_mapping = self._graph.node_id_config_mapping
            
            # 1. 动态添加节点
            for node_id, node_data in node_config_mapping.items():
                wrapper_node = self._create_command_node(node_id, node_data, state_context)
                workflow.add_node(node_id, wrapper_node)
            # 2. 动态添加边
            for edge in edges:
                workflow.add_edge(edge.source, edge.target)
                
            return workflow.compile()

StateGraph设置完成后,会调用compile()进行编译,在这个阶段会对整个图进行结构校验,包括是否存在环、是否有孤立节点或不可达节点和节点和边的引用是否完整等。

2. 异步调度与事件流

GraphEngine的执行核心并非简单的循环,而是基于 LangGraph 提供的 astream 接口实现的异步事件流处理。事件类型包括FLOW_START、NODE_START、NODE_END、FLOW_END。

# 伪代码示意
async def _run_workflow(self, inputs: GraphGenerateEntity, enable_run_log: bool):
    graph_app = self._build_graph_app(state_context)
    state = self._init_graph_state(inputs)
    # ... 初始化状态 ...
    yield GraphEvent(event=GraphEventEnum.FLOW_START, run_id=run_id)
    
    # 订阅 LangGraph 的流式输出,关注 "custom" (自定义事件) 和 "tasks" (节点状态)
    event_stream = graph_app.astream(state, stream_mode=["custom", "tasks"])
    
    async for event_tuple in event_stream:
        stream_mode, event_message = event_tuple
        
        # 将 LangGraph 的内部事件转换为 aiworks 的标准 GraphEvent
        if stream_mode == "tasks":
            # 处理节点启动/结束
            yield GraphEvent(event=GraphEventEnum.NODE_START, ...)
        else:
            # 透传自定义事件 (如 LLM Token)
            yield GraphEvent(**event_message)
            
    yield GraphEvent(event=GraphEventEnum.FLOW_END, ...)

3. 状态持久化

在工作流执行结束后,引擎会自动进行“快照存档”。这种设计不仅仅是保存 Log,它保存了完整的运行时状态(GraphRuntimeState)。这意味着:

  • 可溯源:你可以随时打开一个历史运行记录,看到当时图的结构(哪怕现在的图已经改了)以及每个节点的输入输出。
  • 可恢复(未来及展望):这种结构为未来的“断点续跑”和“人工介入”功能打下了数据基础。

状态管理(State):可观测的执行过程

工作流执行过程中,最重要的就是状态管理。我们设计了三层状态结构:

1. 变量池(VariablePool)

这是整个工作流的"记忆",存储所有变量:

class VariablePool(BaseModel):
    user_inputs: dict  # 用户输入的变量
    system_inputs: dict  # 系统变量(如 conversation_id)
    pool: dict  # 节点执行过程中产生的变量

节点执行时,可以从pool中读取前置节点的输出,也可以把自己的输出写入pool,供后续节点使用。

2. 节点状态(NodeState)

记录每个节点的执行状态:

class NodeState(BaseModel):
    id: str
    status: NodeStatus  # pending/running/succeeded/failed
    inputs: dict  # 节点输入
    result: dict  # 节点输出
    start_at: datetime
    finished_at: datetime
    error_msg: str

这里有个细节:我们会完整记录节点的输入和输出。这样做的好处是:

  • 执行回放:根据 inputs 可以重新执行节点,复现问题
  • 调试:可以清楚看到每个节点的输入输出,快速定位问题
  • 性能分析:通过 start_at 和 finished_at 计算耗时,找出瓶颈

3. 工作流运行时状态(GraphRuntimeState)

整个工作流的全局状态:

classGraphRuntimeState(BaseModel):
    query:str
    variable_pool: VariablePool
    node_state_mapping: dict[str, NodeState] # 所有节点状态
    routes: dict[str, list[str]] # 实际执行路径
    status: GraphStatus # running/succeeded/failed
    output:str|dict # 最终输出

执行完成后,我们会把GraphRuntimeState 序列化成 JSON,存储到数据库。这样就有了完整的执行记录,方便后续分析和优化。

节点系统:可扩展的能力单元

如果说图引擎是工作流的"大脑",那节点系统就是"四肢"——具体干活的地方。

1. 节点抽象:模板方法模式的实践

所有节点都继承自BaseNode,它定义了节点的生命周期:

class BaseNode:
    def __init__(self, node_id, node_data, user_id, tenant_id, graph):
        self.node_id = node_id
        self.node_data = node_data
        self.init_node()
    
    def init_node(self):
        config = self.resolve_node_data()
        self.init_node_config(config)
    
    @abstractmethod
    def _run(self, state) -> NodeResult:
        raise NotImplementedError
    
    async def run(self, state):
        return await self._run(state)

这是典型的模板方法模式

  • init_node() 定义了初始化流程
  • 子类只需要实现 init_node_config() 和 _run() 两个方法

这样做的好处是统一了节点的初始化流程,子类只需要关注自己的核心逻辑。

2. 节点概览:工作流的能力单元

Start 节点:工作流的入口节点,标识整个流程的起点,每个工作流都必须有一个 Start 节点,它负责接收用户的输入变量,并将它们传递给后续节点。

LLM 节点:调用大语言模型进行文本生成、对话、摘要等任务。这是使用最频繁的节点,支持友好的提示词管理、记忆管理、多种LLM提供商等。

Knowledge Retrieval 节点:从向量数据库中检索相关知识文档。典型应用场景是 RAG(检索增强生成),先检索相关知识,再传给 LLM 节点进行回答。

Tool 节点:工具节点是与外部环境交互的接口,支持内置工具、API工具和自定义工具。

IF-Else 节点:根据条件动态选择执行路径, 是实现复杂业务逻辑的关键。支持:

  • 多种比较操作符(等于、包含、为空等)
  • AND / OR 逻辑组合
  • 多分支(IF / ELIF / ELSE)
  • 基于变量池中的任意变量做判断

Code 节点:执行用户自定义的代码逻辑。适合处理复杂的数据转换、计算逻辑等 LLM 不擅长的任务。

HTTP 节点:发起 HTTP 请求,调用外部 API。

Answer 节点:格式化最终输出,返回给用户。

3. 扩展新节点:三步走

当我们需要新增节点类型时, 流程很简单:

1)定义配置类:继承类BaseNodeConfig

lass MyNodeConfig(BaseNodeConfig):
    param1: str
    param2: int

2)实现节点类:继承BaseNode

class MyNode(BaseNode):
    _node_type = NodeType.MY_NODE
    
    def init_node_config(self, valid_config):
        self.node_config = MyNodeConfig(**valid_config)
    
    def _run(self, state):
        # 实现具体逻辑
        return NodeResult(result={...})

3)注册节点类型:在 NodeType 枚举和工厂方法中注册

这个设计让节点系统具备了很强的扩展性,每个节点并遵守单一原则,这样就能保证新增节点时效率高,还能保持引擎代码的稳定。

总结

回顾整个工作流引擎的开发过程,最大的感受是:好的架构设计真的能事半功倍

我们在 AIWorks 中遵循的几个核心理念:

  • 分层解耦:图定义、执行引擎、节点实现各自独立
  • 抽象优先:基于接口编程,易于扩展
  • 可观测性:完整记录执行链路,方便调试和优化
  • 异步流式:提升用户体验和系统吞吐

目前这套系统已经在生产环境中稳定运行,支撑了多个企业级应用。当然,还有很多可以优化的地方:

  • 可视化调试器:图形化展示执行流程和状态变化
  • 分布式执行:支持大规模工作流的分布式调度
  • 智能优化:基于历史数据,自动优化节点配置

希望这篇文章能对正在做类似系统的同学有所帮助。如果你有任何问题或建议,欢迎留言交流!

一、概述

LangGraph是LangChain团队开发的低级别编排框架,专为构建、管理和部署长时间运行的有状态AI代理设计,提供持久化执行、灵活控制流和全面内存管理功能,支持循环和条件分支,是开发复杂AI工作流的理想选择。

1.1 核心特点

  • 持久执行:自动保存执行状态,支持故障恢复和断点续跑
  • 循环与分支:突破传统DAG限制,支持复杂的条件判断和循环逻辑
  • 全面内存:集成短期工作内存和长期持久内存,支持跨会话状态保留
  • 人机协作:内置中断机制,允许人工介入审批或修改代理行为
  • 流支持:实时输出执行结果,包括LLM的token级流式响应
  • 可观察性:无缝集成LangSmith,提供完整的执行轨迹和状态转换可视化

二、核心概念

2.1 状态(State)

共享内存,所有节点都可读写的全局数据结构,是代理的"工作记忆"。

  • 定义为Python的TypedDict或dataclass,包含代理需要的所有信息
  • 存储原始数据而非格式化文本,确保不同节点可灵活使用
  • 示例:

    from typing import TypedDict
    class AgentState(TypedDict):
        messages: list  # 对话消息列表
        search_results: list  # 搜索结果
        user_preferences: dict  # 用户偏好

2.2 节点(Nodes)

图的基本执行单元,是接收状态并返回更新的函数。

  • 类型:

    • LLM节点:调用语言模型进行文本理解或生成
    • 工具节点:执行外部API调用、数据库查询等
    • 数据处理节点:转换或分析数据
    • 人工介入节点:暂停执行等待用户输入
  • 定义示例:

    def greet(state: AgentState) -> dict:
        return {"greeting": f"Hello, {state['user_name']}!"}

2.3 边(Edges)

节点间的连接,定义执行流的路径。

  • 普通边:始终执行固定路径
  • 条件边:根据状态决定下一步执行节点
  • 定义示例:

    # 普通边:从"start"到"greet"
    graph.add_edge("start", "greet")
    
    # 条件边:根据状态判断是执行"search"还是"reply"
    def decide_next(state: AgentState) -> str:
        return "search" if state["needs_info"] else "reply"
    graph.add_conditional_edges("greet", decide_next)

三、架构与工作原理

3.1 图结构

LangGraph使用有向图模型表示代理工作流,包含:

  • 特殊节点

    • START:执行入口点
    • END:执行结束点
  • 执行模型:基于"消息传递"的迭代执行,以离散"超步骤"(super-step)推进

3.2 状态管理

  • 短期内存:线程范围内存,随执行结束自动清除
  • 长期内存

    • 存储于独立的Store系统,支持跨会话、跨线程访问
    • 使用namespacekey组织数据,类似文件系统的目录和文件名
    • 支持多种存储后端:内存(开发)、PostgreSQL(生产)、Redis等

3.3 执行流程

  1. 初始化状态并设置入口节点
  2. 执行入口节点,更新状态
  3. 根据边的类型(普通/条件)决定下一节点
  4. 重复直到到达END或达到递归限制(默认25步)
  5. 执行过程中自动保存检查点,支持故障恢复

四、存储方案

LangGraph支持多种存储后端,满足不同场景需求:

存储类型适用场景特点配置示例
InMemoryStore开发测试速度快,无持久化store = InMemoryStore()
PostgresStore生产环境高可靠,支持事务store = PostgresStore("postgresql://user:pass@host/db")
RedisStore分布式系统高性能读写,适合缓存store = RedisStore("redis://host:port")
SQLiteStore轻量级应用文件存储,无需服务器store = SQLiteStore("langgraph.db")

长期记忆配置

from langgraph.store.postgres import PostgresStore
from langgraph.backends import CompositeBackend, StateBackend, StoreBackend

# 配置复合存储:/memories/路径下的数据持久化,其他临时存储
def make_backend(runtime):
    return CompositeBackend(
        default=StateBackend(runtime),  # 临时存储
        routes={"/memories/": StoreBackend(runtime, PostgresStore("..."))}  # 持久存储
    )

五、使用方法

5.1 安装

pip install -U langgraph  # Python版本
npm install @langchain/langgraph  # JavaScript版本

5.2 基本使用步骤

1. 定义状态

from typing import TypedDict
class ChatState(TypedDict):
    messages: list  # 对话消息列表

2. 构建图

from langgraph.graph import StateGraph, START, END
from langchain.llms import OpenAI

# 初始化图
graph = StateGraph(ChatState)

# 定义节点:调用LLM生成回复
def call_llm(state: ChatState):
    llm = OpenAI(temperature=0)
    response = llm.invoke(state["messages"])
    return {"messages": state["messages"] + [response]}

# 添加节点和边
graph.add_node("generate_response", call_llm)
graph.add_edge(START, "generate_response")
graph.add_edge("generate_response", END)

3. 编译并执行

# 编译为可执行应用
app = graph.compile()

# 执行
initial_state = {"messages": [{"role": "user", "content": "Hello!"}]}
final_state = app.invoke(initial_state)
print(final_state["messages"][-1]["content"])  # 输出AI回复

5.3 条件执行与循环

# 定义条件函数:检查是否需要调用工具
def needs_tool(state: ChatState) -> Literal["use_tool", "reply"]:
    last_message = state["messages"][-1]
    return "use_tool" if last_message.get("tool_calls") else "reply"

# 添加条件边
graph.add_conditional_edges("generate_response", needs_tool)

# 添加工具节点和循环边
graph.add_node("use_tool", tool_node)
graph.add_edge("use_tool", "generate_response")  # 循环回LLM节点

六、API参考

6.1 Graph API

核心类

  • StateGraph:构建状态驱动的图,需传入状态类型
  • MessageState:预定义的消息状态,适合聊天应用
  • Checkpointer:管理执行状态的保存和恢复

关键方法

  • add_node(name, function, **kwargs):添加节点,支持重试策略等配置
  • add_edge(from_node, to_node):添加普通边
  • add_conditional_edges(from_node, condition_func):添加条件边
  • compile(checkpointer=None):编译图为可执行应用,支持持久化配置
  • invoke(input_state, config=None):执行图,返回最终状态

6.2 Functional API (简化版)

提供更简洁的方式构建小型工作流:

from langgraph import entrypoint, task

@entrypoint
def my_agent():
    state = {"counter": 0}
    while state["counter"] < 3:
        state = task(increment)(state)  # 调用任务函数
    return state

@task
def increment(state):
    state["counter"] += 1
    return state

result = my_agent()  # 执行

七、开发指南

7.1 构建步骤

  1. 设计工作流:将问题分解为离散步骤,确定节点间依赖关系
  2. 定义状态:确定需要在步骤间共享的数据
  3. 实现节点:为每个步骤编写函数,处理输入状态并返回更新
  4. 连接节点:使用边定义执行顺序,添加必要的条件判断
  5. 添加内存:配置检查点和持久化,实现长期记忆
  6. 测试与调试:使用LangSmith可视化执行过程,检查状态转换

7.2 最佳实践

状态设计

  • 只存储必要信息,避免冗余
  • 保持状态原始,在节点内格式化输出
  • 使用描述性键名,提高可读性

节点设计

  • 单一职责:每个节点专注做一件事
  • 错误处理:为不同错误类型设置适当的处理策略(重试/回退/人工介入)
  • 外部调用:将API调用、数据库操作等封装为独立节点,便于添加重试和监控

内存管理

  • 短期数据存于状态,长期数据使用专用存储
  • 定期清理过时数据,优化存储性能
  • 使用命名空间组织长期数据,便于管理和查询

八、调试与监控

8.1 使用LangSmith集成

LangGraph无缝集成LangSmith,提供全面的可观察性:

# 启用LangSmith追踪
import os
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "..."

# 编译图时启用追踪
app = graph.compile(checkpointer=checkpointer, trace=True)

监控功能

  • 执行轨迹可视化:查看完整执行路径和状态变化
  • 性能分析:测量各节点执行时间,识别瓶颈
  • 异常检测:自动标记执行错误和异常路径
  • 交互式调试:在LangSmith Studio中检查中间状态

8.2 本地调试技巧

  • 断点打印:在节点函数中添加print语句,输出关键状态
  • 分步执行:使用graph.invoke并传入小输入,逐步验证每个节点
  • 错误处理:为节点添加详细的异常捕获和日志记录:

    def safe_node(state):
        try:
            # 正常逻辑
        except Exception as e:
            return {"error": str(e)}  # 返回错误信息而非崩溃

九、部署方案

9.1 自托管部署

使用Docker

# 安装CLI
pip install -U langgraph-cli

# 构建镜像
langgraph build --name my-agent .

# 运行
docker run -p 8124:8124 my-agent

生产配置建议

  • 使用PostgreSQL作为存储后端,确保数据持久化
  • 配置数据加密,保护敏感信息
  • 设置适当的资源限制,防止滥用
  • 使用负载均衡和水平扩展,提高吞吐量

9.2 LangSmith Cloud (原LangGraph Platform)

提供一键式云部署:

  • Lite版本:免费使用,每年限制100万节点执行
  • Enterprise版本:全功能支持,适合大规模生产环境

优势

  • 自动扩展和高可用性
  • 内置监控和告警系统
  • 开箱即用的安全与合规功能
  • 与LangSmith无缝集成,提供完整的可观察性

十、完整示例:构建天气查询代理

# 1. 安装依赖
pip install langgraph langchain openai

# 2. 导入必要模块
from typing import TypedDict, Literal
from langchain.llms import OpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver  # 内存检查点

# 3. 定义状态
class WeatherAgentState(TypedDict):
    messages: list  # 对话消息
    location: str  # 查询的城市
    weather_info: str  # 天气信息

# 4. 定义工具函数
def get_weather(location: str) -> str:
    """简化的天气查询API"""
    if location.lower() == "sf":
        return "60°F, foggy"
    elif location.lower() == "ny":
        return "90°F, sunny"
    else:
        return "Weather data not available for this location"

# 5. 定义节点
def initial_prompt(state: WeatherAgentState) -> dict:
    """询问用户想查询哪个城市的天气"""
    llm = OpenAI(temperature=0)
    response = llm.invoke([HumanMessage(content="Which city's weather would you like to check?")])
    return {"messages": [response]}

def parse_location(state: WeatherAgentState) -> dict:
    """从用户消息中提取城市名"""
    last_message = state["messages"][-1]
    location = last_message.content.strip().lower()
    return {"location": location, "messages": state["messages"] + [AIMessage(content=f"Checking weather for {location}...")]}

def get_weather_info(state: WeatherAgentState) -> dict:
    """调用天气工具获取信息"""
    weather = get_weather(state["location"])
    return {"weather_info": weather, "messages": state["messages"] + [AIMessage(content=f"Weather in {state['location']}: {weather}")]}

# 6. 构建图
graph = StateGraph(WeatherAgentState)

# 添加节点
graph.add_node("initial_prompt", initial_prompt)
graph.add_node("parse_location", parse_location)
graph.add_node("get_weather_info", get_weather_info)

# 添加边定义执行流
graph.add_edge(START, "initial_prompt")
graph.add_edge("initial_prompt", "parse_location")
graph.add_edge("parse_location", "get_weather_info")
graph.add_edge("get_weather_info", END)

# 7. 添加内存支持
checkpointer = MemorySaver()  # 使用内存检查点保存状态
app = graph.compile(checkpointer=checkpointer)

# 8. 执行代理
first_run = app.invoke({})
print("First run output:")
for msg in first_run["messages"]:
    print(f"{msg['role'].capitalize()}: {msg['content']}")

print("\nSecond run (with state persistence):")
# 第二次执行会保留之前的对话状态
second_run = app.invoke({})
for msg in second_run["messages"]:
    print(f"{msg['role'].capitalize()}: {msg['content']}")

十一、总结

LangGraph是构建复杂AI代理的强大框架,通过状态驱动的图结构,提供了持久执行、灵活控制流和全面内存管理能力。使用LangGraph,开发者可以轻松构建具有记忆、能够处理复杂逻辑的AI代理,适用于客服、研究助手、自动化工作流等多种场景。

本系列介绍增强现代智能体系统可靠性的设计模式,以直观方式逐一介绍每个概念,拆解其目的,然后实现简单可行的版本,演示其如何融入现实世界的智能体系统。本系列一共 14 篇文章,这是第 14 篇。原文:Building the 14 Key Pillars of Agentic AI

优化智能体解决方案需要软件工程确保组件协调、并行运行并与系统高效交互。例如预测执行,会尝试处理可预测查询以降低时延,或者进行冗余执行,即对同一智能体重复执行多次以防单点故障。其他增强现代智能体系统可靠性的模式包括:

  • 并行工具:智能体同时执行独立 API 调用以隐藏 I/O 时延。
  • 层级智能体:管理者将任务拆分为由执行智能体处理的小步骤。
  • 竞争性智能体组合:多个智能体提出答案,系统选出最佳。
  • 冗余执行:即两个或多个智能体解决同一任务以检测错误并提高可靠性。
  • 并行检索和混合检索:多种检索策略协同运行以提升上下文质量。
  • 多跳检索:智能体通过迭代检索步骤收集更深入、更相关的信息。

还有很多其他模式。

本系列将实现最常用智能体模式背后的基础概念,以直观方式逐一介绍每个概念,拆解其目的,然后实现简单可行的版本,演示其如何融入现实世界的智能体系统。

所有理论和代码都在 GitHub 仓库里:🤖 Agentic Parallelism: A Practical Guide 🚀

代码库组织如下:

agentic-parallelism/
    ├── 01_parallel_tool_use.ipynb
    ├── 02_parallel_hypothesis.ipynb
    ...
    ├── 06_competitive_agent_ensembles.ipynb
    ├── 07_agent_assembly_line.ipynb
    ├── 08_decentralized_blackboard.ipynb
    ...
    ├── 13_parallel_context_preprocessing.ipynb
    └── 14_parallel_multi_hop_retrieval.ipynb

深度推理的多跳检索

许多复杂的用户查询并非单一问题,而是比较性的、多步骤的调研任务,需要从多个不同来源的文档中综合信息。

并行多跳

解决方案是 并行多跳检索(Parallel Multi-Hop Retrieval) 架构,这种模式将 RAG 系统提升为真正的调研代理,工作流模拟人类研究员如何处理复杂问题的过程:

  1. 分解(Decompose):高级元代理首先分析复杂的用户查询,将其分解为几个更简单、独立的子问题。
  2. 分散(并行检索):每个子问题都被派发给各自的专用检索代理。这些代理并行运行,每个代理执行标准 RAG 流程,为特定子问题寻找答案。
  3. 收集与综合:元代理收集所有子问题的答案,进行最终推理步骤,将它们综合为对原始复杂查询的单一、全面的答案。

我们将以一个无法通过单一检索回答的比较性问题为例,构建并比较简单 RAG 系统与多跳 RAG 系统,证明只有多跳系统才能成功收集必要的证据,以提供准确且富有洞察力的最终答案。

首先为初始分解步骤定义 Pydantic 模型,从而结构化元代理规划阶段输出的内容。

from langchain_core.pydantic_v1 import BaseModel, Field
from typing import List

class SubQuestions(BaseModel):
    """分解代理输出的Pydantic模型,包含一组独立的子问题"""
    questions: List[str] = Field(description="A list of 2-3 simple, self-contained questions that, when answered together, will fully address the original complex query.")

这个 SubQuestions 模型是元代理首次行动的合约,迫使 LLM 将复杂查询分解为一系列简单、可回答的问题,是并行"分而治之"策略的基础步骤。

然后构建高级多跳系统作为 LangGraph 图。第一个节点将是"分解器",即元代理的规划角色。

from typing import TypedDict, List, Dict, Annotated
import operator

class MultiHopRAGState(TypedDict):
    original_question: str
    sub_questions: List[str]
    # 字典以问题作为键,存储每个子问题的答案
    sub_question_answers: Annotated[Dict[str, str], operator.update]
    final_answer: str

# 节点 1:分解器(元代理的第一步)
decomposer_prompt = ChatPromptTemplate.from_template(
    "You are a query decomposition expert. Your job is to break down a complex question into simple, independent sub-questions that can be answered by a retrieval system. "
    "Do not try to answer the questions yourself.\n\n"
    "Question: {question}"
)

decomposer_chain = decomposer_prompt | llm.with_structured_output(SubQuestions)

def decomposer_node(state: MultiHopRAGState):
    """获取原始复杂问题并将其分解为子问题列表"""
    print("--- [Meta-Agent] Decomposing complex question... ---")
    result = decomposer_chain.invoke({"question": state['original_question']})
    print(f"--- [Meta-Agent] Generated {len(result.questions)} sub-questions. ---")
    return {"sub_questions": result.questions}

decomposer_node 是研究代理的战略大脑,它不会尝试回答查询,其唯一且关键的任务是分析用户意图并将其分解为一组独立、可并行化的研究任务。

下一个节点将并行为每个子问题协调执行标准的 RAG 流程。

from concurrent.futures import ThreadPoolExecutor, as_completed

# 标准、自包含的RAG链,是并行检索代理的“引擎”
sub_question_rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | generator_prompt
    | llm
    | StrOutputParser()
)

def retrieval_agent_node(state: MultiHopRAGState):
    """节点 2:为每个子问题并行运行完整 RAG 进程"""
    print(f"--- [Retrieval Agents] Answering {len(state['sub_questions'])} sub-questions in parallel... ---")
    
    answers = {}
    # 用 ThreadPoolExecutor 对每个子问题并发运行‘sub_question_rag_chain’
    with ThreadPoolExecutor(max_workers=len(state['sub_questions'])) as executor:
        # 为每个待回答子问题构建一个 future
        future_to_question = {executor.submit(sub_question_rag_chain.invoke, q): q for q in state['sub_questions']}
        for future in as_completed(future_to_question):
            question = future_to_question[future]
            try:
                answer = future.result()
                answers[question] = answer
                print(f"  - Answer found for sub-question: '{question}'")
            except Exception as e:
                answers[question] = f"Error answering question: {e}"
    # 将结果收集到“sub_question_answers”字典中
    return {"sub_question_answers": answers}

retrieval_agent_node 是系统中的分散-聚合核心,接收 sub_questions 列表,并用 ThreadPoolExecutor 将每个条目分配到各自独立的 RAG 链。这是一种强大的并行形式,同时运行多个完整 RAG 流程。在所有并行代理找到答案后,该节点将所有发现汇总到 sub_question_answers 字典中。

最后,“合成器”节点作为元代理的最终步骤,将并行发现整合为一个连贯的答案。

# 节点 3:合成器(元代理的最后一步)
synthesizer_prompt = ChatPromptTemplate.from_template(
    "You are a synthesis expert. Your job is to combine the answers to several sub-questions into a single, cohesive, and comprehensive answer to the user's original complex question.\n\n"
    "Original Question: {original_question}\n\n"
    "Sub-Question Answers:\n{sub_question_answers}"
)

synthesizer_chain = synthesizer_prompt | llm | StrOutputParser()

def synthesizer_node(state: MultiHopRAGState):
    """获取子问题的答案,并合成最终的全面答案"""
    print("--- [Meta-Agent] Synthesizing final answer... ---")
    
    # 将收集的子问题答案格式化为最终提示
    sub_answers_str = "\n".join([f"- Q: {q}\n- A: {a}" for q, a in state['sub_question_answers'].items()])
    
    final_answer = synthesizer_chain.invoke({
        "original_question": state['original_question'],
        "sub_question_answers": sub_answers_str
    })
    return {"final_answer": final_answer}

synthesizer_node 是至关重要的最终推理步骤,它本身不执行任何检索,任务是接收 sub_question_answers 中的预处理事实,并将其构造为能直接回应用户原始复杂查询的连贯叙述。

最后按线性顺序组装图:分解 -> 并行检索 -> 综合。

from langgraph.graph import StateGraph, END

workflow = StateGraph(MultiHopRAGState)
workflow.add_node("decompose", decomposer_node)
workflow.add_node("retrieve_in_parallel", retrieval_agent_node)
workflow.add_node("synthesize", synthesizer_node)

workflow.set_entry_point("decompose")

workflow.add_edge("decompose", "retrieve_in_parallel")
workflow.add_edge("retrieve_in_parallel", "synthesize")
workflow.add_edge("synthesize", END)
multi_hop_rag_app = workflow.compile()

并行多跳检索

给两个系统一个复杂且需要比较的问题,这个问题无法通过单次检索调用正确回答,从而对比分析两种查询方式。

# 查询需要比较两个产品,信息在独立、不重叠的文档中
user_query = "Compare the QLeap-V4 and the Eco-AI-M2, focusing on their target use case and power consumption."

# --- 执行简单 RAG ---
print("="*60)
print("                  SIMPLE RAG SYSTEM OUTPUT")
print("="*60 + "\n")
print(f"Final Answer:\n{simple_answer}")

# --- 执行多跳 RAG ---
print("\n" + "="*60)
print("                 MULTI-HOP RAG SYSTEM OUTPUT")
print("="*60 + "\n")
print("--- Sub-Question Answers ---")
for i, (q, a) in enumerate(multi_hop_result['sub_question_answers'].items()):
    print(f"{i+1}. Q: {q}\n   A: {a}")
print("\n--- Final Synthesized Answer ---")
print(multi_hop_result['final_answer'])

# --- 最终分析 ---
print("\n" + "="*60)
print("                     ACCURACY & QUALITY ANALYSIS")
print("="*60 + "\n")
print("**Simple RAG Performance:**")
print("- Result: COMPLETE FAILURE.")
print("- Reason: The user's query contained terms for both products. Vector search found the documents that were, on average, most semantically similar to the entire query, retrieving only documents about the Eco-AI-M2. It completely failed to retrieve any information about the QLeap-V4. Without the necessary context for both products, a comparison was impossible.\n")
print("**Multi-Hop RAG Performance:**")
print("- Result: COMPLETE SUCCESS.")
print("- Reason: The system's intelligence was in the initial decomposition step. The Meta-Agent broke the complex comparative query into two simple, focused sub-questions: 1. Get info on Product A. and 2. Get info on Product B. The parallel Retrieval Agents had no trouble answering these simple questions, each retrieving the correct, focused context. The final Synthesizer agent then received a perfect, complete set of facts about both products, making the final comparison trivial.")

输出为……

#### 输出 ####
============================================================
                  SIMPLE RAG SYSTEM OUTPUT
============================================================

Final Answer:
Based on the provided context, the Eco-AI-M2 chip is designed for edge computing and mobile devices, with a primary feature of low power consumption at only 15W under full load. The context does not contain information about the QLeap-V4, so I cannot provide a comparison.

============================================================
                 MULTI-HOP RAG SYSTEM OUTPUT
============================================================
--- Sub-Question Answers ---
1. Q: What is the target use case and power consumption of the QLeap-V4?
   A: The QLeap-V4 processor is designed for maximum performance in data centers, with a primary use case of large-scale AI model training. It consumes 1200W of power under full load.
2. Q: What is the target use case and power consumption of the Eco-AI-M2?
   A: The Eco-AI-M2 chip is designed for edge computing and mobile devices like drones and smart cameras. Its key feature is low power consumption, drawing only 15W under full load.
--- Final Synthesized Answer ---
The QLeap-V4 and the Eco-AI-M2 are designed for very different purposes, primarily distinguished by their target use case and power consumption.
-   **QLeap-V4**: This is a high-performance processor intended for data centers. Its main use case is large-scale AI model training, and it has a high power consumption of 1200W.
-   **Eco-AI-M2**: This is a low-power chip designed for edge computing and mobile devices. Its focus is on energy efficiency, consuming only 15W, making it suitable for applications like drones and smart cameras.

最终分析得出明确结论,性能差异并非渐进式,而是一次能力上的飞跃。

  • 单次检索步骤无法解决比较查询歧义,仅检索了两个产品中的一个上下文,从根本上无法收集必要的证据。
  • 多跳系统之所以成功,是因为没有试图一次性回答复杂问题,而是识别了查询的比较性质,并将问题分解。
  • 通过并行、专注的 RAG 代理来解决每个简单的子问题,确保收集了所有必要证据,最后的综合步骤只是简单的将预先处理的事实结合起来。

Hi,我是俞凡,一名兼具技术深度与管理视野的技术管理者。曾就职于 Motorola,现任职于 Mavenir,多年带领技术团队,聚焦后端架构与云原生,持续关注 AI 等前沿方向,也关注人的成长,笃信持续学习的力量。在这里,我会分享技术实践与思考。欢迎关注公众号「DeepNoMind」,星标不迷路。也欢迎访问独立站 www.DeepNoMind.com,一起交流成长。

本文由mdnice多平台发布

项目介绍:

基于 Django、langgraph、langchain 开发的 AI 自动化测试平台。

目前功能:

1、需求文档智能评审并指正需要改正的问题。
2、AI 根据知识库和需求文档生成测试用例。
3、自然语言用例执行并生成对应的 playwright 自动化脚本。
4、执行用例时自动截图上传会平台。
5、批量执行功能测试用例和 playwright 脚本并生成对应的报告。

效果展示



如果项目对你有帮助,请帮我点点 star ~

项目地址: MGdaasLab/WHartTest: WHartTest 是基于 Django REST Framework 与现代大模型技术打造的 AI 驱动测试自动化平台。平台聚合自然语言理解、知识库检索与嵌入搜索能力,结合 LangChain 与 MCP(Model Context Protocol) 工具调用,实现从需求到可执行测试用例的自动化生成与管理,帮助测试团队提升效率与覆盖率。

声明:

本人纯编程小白,项目由本人提供思路,AI 负责功能代码实现。(也算是站在巨人的肩膀上了)


📌 转载信息
原作者:
duanxc
转载时间:
2025/12/30 18:07:51

佬友们,分享一下我最近开发的 Excel 智能数据分析软件–ExcelMind
算是在 L 站第一次分享我的开源项目,希望佬友们多多提 Issue,多多 Star。

下面是演示视频:

这个项目是基于 LangGraph 开发的,支持自然语言查询、多轮对话、流式输出、ECharts 图表可视化 和可视化思考过程。

GitHub 地址:GitHub - stark-456/ExcelMind: AI 智能分析 Excel 文件,对话式完成多场景 Excel 分析任务,解决 Excel 报表分析复杂、效率低等痛点

上传 Excel 文件后,我们可以用自然语言跟 AI 对话,AI 会自主决策,自主调用工具,完成 Excel 的分析任务。
原理呢,其实是只给 AI 看一部分 Excel 的局部,让 AI 了解表结构之后,调用十个工具来完成分析任务,避免 AI 直接看数据做分析带来幻觉,是我觉得做分析必须要考虑到的。

分析过程可视化:

AI 的分析过程,我都尽可能做了显式的输出,并做了前端优化让工具调用更易于阅读,目的是让 AI 做的每一步都是易于追溯的,这样可以让分析过程摆脱黑盒,让我们对分析过程掌控度更高,即使是出错了,也容易改正。

图表分析:

这里我先加入了 bar (柱状图), line (折线图), pie (饼图),scatter (散点图), radar (雷达图), funnel (漏斗图)。你可以指定 AI 输出什么图,如果不指定,AI 会自主决策输出什么图

知识库

考虑到有时候我们的 Excel 文档有很多 AI 不易理解的字段或信息,我加入了知识库功能,会在每次问答前进行召回,这样,有一些特殊需求我们就可以放在知识库里

智能联表

这个是考虑到有时候需要多表联查,但是很多朋友没有数据库基础,这里选定两张表,可以触发 AI 推荐联表的外键跟连接方式,实测,还是很准的,基本上不用自己去考虑怎么联表方式。

实测,模型能力越强,回答越精准,所以推荐佬们用 Sota 模型,站里很多公益站的模型就很不错!

大概功能就是这些,希望有建议的佬随时互动,我会认真看并改进。
以后会不断开发 AI 智能体项目,并开源给大家,希望多多支持!
GitHub 地址:GitHub - stark-456/ExcelMind: AI 智能分析 Excel 文件,对话式完成多场景 Excel 分析任务,解决 Excel 报表分析复杂、效率低等痛点


📌 转载信息
原作者:
fengling666
转载时间:
2025/12/30 10:20:29