标签 自动化测试 下的文章

随着自动化技术、数据采集和智能测试的普及,“Headless Browser(无头浏览器)”已经从一个偏工程师内部使用的工具,逐渐演变为数据工程、爬虫系统、自动化测试乃至 AI 应用中的核心组件。很多人听说过这个概念,却并不真正理解它的工作原理、真实能力以及在复杂网络环境中的价值。
本文将系统性地解析什么是 Headless Browser,它与传统浏览器的本质区别,以及它在现代互联网应用中的实际意义。

Headless Browser 的基本定义

Headless Browser,直译为“无头浏览器”,这里的“头”指的是图形用户界面。也就是说,它本质上是一个没有可视化界面的浏览器内核,能够像普通浏览器一样解析 HTML、执行 JavaScript、加载 CSS、发起网络请求、处理 Cookie 和本地存储,但所有过程都在后台完成,不向用户展示页面。
从功能角度看,Headless Browser 并不是“阉割版浏览器”,恰恰相反,它在网页渲染和脚本执行层面,与完整浏览器几乎一致,只是去掉了 UI 渲染这一步。
正因为这一特性,它非常适合被程序控制,用于自动化任务。

无头浏览器是如何工作的

理解 Headless Browser 的关键,在于理解现代网页的加载逻辑。
当你用普通浏览器访问一个网站时,背后会经历一整套流程:
浏览器发起请求、接收 HTML、解析 DOM、加载 CSS、执行 JavaScript、请求接口数据、动态更新页面内容。Headless Browser 运行的正是这套完整流程,只是整个过程由代码驱动,而不是由用户点击和操作触发。开发者可以通过脚本控制它打开页面、等待资源加载、模拟滚动、填写表单、点击按钮,甚至执行复杂的交互逻辑。
从服务器的角度看,它看到的并不是一个“工具”,而是一个行为极其接近真实用户的浏览器环境。

Headless Browser 与普通爬虫的根本区别

很多初学者会把 Headless Browser 与传统 HTTP 爬虫混为一谈,但两者在能力层级上有明显差异。
传统爬虫更多依赖直接请求接口或页面源代码,适合结构简单、反爬较弱的网站。但面对大量使用前端框架、动态渲染、接口签名和行为校验的网站时,传统爬虫往往寸步难行。
Headless Browser 的优势在于,它可以完整执行前端逻辑,获取最终渲染后的真实页面状态。这意味着即使网站内容完全由 JavaScript 动态生成,也依然可以被正确获取。
在很多复杂网站场景中,无头浏览器已经成为“唯一可行方案”。

为什么 Headless Browser 会被重点风控

正因为 Headless Browser 具备强大的模拟能力,它也成为各大平台重点识别和限制的对象。
近年来,网站风控系统不再只看 IP 和请求频率,而是更加关注浏览器指纹、行为轨迹和执行环境。一些常见的 Headless Browser 在默认配置下,会暴露出明显的自动化特征,例如特定的 JavaScript 属性、异常的渲染行为或不符合真实用户的操作节奏。
这也是很多人会遇到“明明用了无头浏览器,却还是被封”的根本原因。
Headless Browser 本身不是问题,问题在于环境是否足够真实、行为是否足够自然、网络身份是否可信。

无头浏览器与网络环境的关系

很多人忽略了一个关键问题:Headless Browser 再强,也只是“浏览器”,它依然运行在某个网络环境之中。
如果网络出口本身存在异常,例如 IP 来源不可信、ASN 被标记、地址被滥用过,那么即使浏览器层面完全模拟真实用户,依然可能在请求阶段就被拦截。
这也是为什么在高风控场景下,无头浏览器往往需要配合更稳定、更接近真实用户的网络环境使用。真实住宅网络、干净的出口地址,往往比复杂的浏览器参数伪装更重要。

Headless Browser 是否等同于“自动化作弊”

这是一个经常被误解的问题。
Headless Browser 本身是一种技术工具,它既可以用于合规的自动化测试、数据分析和效率提升,也可能被滥用于违规操作。关键并不在于工具本身,而在于使用方式是否符合平台规则和法律边界。
在合规使用前提下,无头浏览器反而是很多正规企业和开发团队不可或缺的基础设施。

未来趋势:Headless Browser 正在变得“更像人”

随着反自动化技术的不断升级,Headless Browser 也在持续进化。从早期简单执行脚本,到如今强调完整指纹一致性、行为轨迹自然化和环境真实性,无头浏览器正在向“高度拟真浏览环境”发展。
未来,它不再只是一个技术工具,而是整个网络身份系统中的一个重要组成部分。

结语

Headless Browser 并不是一个神秘或危险的概念,它只是现代 Web 技术发展下的必然产物。理解它的原理、边界和真实价值,远比盲目使用或一味回避更重要。
当你真正把它看作一个“没有界面的真实浏览器”,并将其放入合适的网络与合规框架中使用,它所能发挥的价值,远远超出想象。

第一步:开启 Chrome 远程调试权限

这是所有操作的前提,必须手动开启。

  1. 打开 Chrome 浏览器,在地址栏输入:chrome://inspect/#remote-debugging
  2. 勾选 “Allow remote debugging for this browser instance”
  1. 确认下方出现:Server running at: 127.0.0.1:9222(这表示“门”已打开)。


第二步:配置 MCP 服务

AionUI 图形界面配置

  1. 进入 AionUI 的 Settings(设置)Tools Settings(工具设置)

  2. MCP Management 区域点击 Add MCP Service

  3. 选择 Add via JSON,粘贴以下配置:

{ "mcpServers": { "chrome-devtools": { "command": "npx", "args": [ "-y", "chrome-devtools-mcp@latest", "--autoConnect", "--channel=stable" ] } } } 


第三步:激活连接(关键动作)

配置完成后,Gemini 并不会立刻接管浏览器,需要一次“握手”:

  1. 在 Gemini CLI 或 AionUI 中输入第一个指令,例如:截取当前网页的屏幕

  2. 切换回 Chrome 浏览器:你会看到页面顶部弹出一个系统确认框,询问 “Allow incoming debugging connection?”

  3. 点击 Allow(允许)

  4. 浏览器顶部出现横幅(显示“正在受自动测试软件控制”),说明连接成功。


第四步:实战常用指令

连接成功后,你可以直接用自然语言指挥 Gemini 处理你当前看到的网页:

  • 视觉分析
    • “帮我截个图,分析一下现在的 UI 布局有没有错位?”
  • 性能诊断
    • “分析当前页面的性能,告诉我 LCP(最大内容绘制)是多少,怎么优化?”
  • 代码调试
    • “检查当前页面的 Console 控制台,有没有报错?如果有,帮我解释原因。”
  • 网页抓取与操作
    • “提取当前网页中所有商品的价格并列成表格。”
    • “帮我点击页面上的‘提交’按钮。”(AI 会通过 DOM 树自动定位元素)


推荐模型:Gemini 3 Flash、Gemini 3 Pro、GLM 4.7(英伟达无限免费 API)
使用案例:



感谢 的AionUI
AionUi V1.7.4 更新:兼容了Newapi(Cowork开源版可以用公益站/中转站了)

也欢迎使用我的 AMC,这个教程的方法也都是我用 AMC 的深度搜索功能学会的
AMC更新:支持Markdown 转 PDF、划词 TTS


推荐使用 @bbbugg 佬的AIStudioToAPI 构建自己的API 池
https://linux.do/t/topic/1371269

【二改】二改build反代(AIStudio to API),优化云端部署


📌 转载信息
原作者: yeahhe
转载时间: 2026/1/25 08:06:46

作者:辰泉

前言

在 Agentic AI 时代,智能体需要与真实世界交互,而浏览器是连接虚拟世界与现实世界的重要桥梁。AgentRun Browser Sandbox 为智能体提供了安全、高性能、免运维的浏览器执行环境,让 AI Agent 真正具备“上网”的能力——从网页抓取、信息提取到表单填写、自动化操作,一切皆可实现。

AgentRun Browser Sandbox 介绍

什么是 Browser Sandbox?

Browser Sandbox 是 AgentRun 平台提供的云原生无头浏览器沙箱服务,基于阿里云函数计算(FC)构建。它为智能体提供了一个安全隔离的浏览器执行环境,支持通过标准的 Chrome DevTools Protocol (CDP) 远程控制浏览器实例。

核心特性

无头浏览器能力

  • 内置 Chromium/Chrome 浏览器,支持完整的 Web 标准
  • 原生兼容 Puppeteer、Playwright 等主流自动化框架
  • 支持通过 CDP 协议进行精细化控制

实时可视化

  • 内置 VNC 服务,支持实时查看浏览器界面
  • 提供操作录制功能,方便调试和回放
  • 支持通过 noVNC 客户端在网页中直接交互

安全与隔离

  • 每个沙箱实例运行在独立的容器环境中
  • 文件系统和进程空间完全隔离
  • 支持 WSS 加密传输,确保数据安全

Serverless 架构

  • 按需创建,按量付费,无需提前预置资源
  • 快速弹性伸缩,支持高并发场景
  • 零运维,无需管理服务器和浏览器依赖

主要应用场景

  • AI Agent 赋能: 为大模型提供“眼睛”和“手”,执行网页浏览、信息提取、在线操作等任务
  • 自动化测试: 在云端运行端到端(E2E)测试和视觉回归测试
  • 数据采集: 稳定、高效地进行网页抓取,应对动态加载和反爬虫挑战
  • 内容生成: 自动化生成网页截图或 PDF 文档

上手使用 AgentRun Browser Sandbox

AgentRun SDK 快速介绍

后续的内容将基于 AgentRun SDK 进行,因此我们先对 SDK 进行简要介绍。

Agentrun SDK 是一个开源的开发者工具包,本期介绍 Python 版本。其旨在简化智能体与 AgentRun 平台各种服务(包括 Browser Sandbox)的集成。它提供了统一的接口,让您可以用几行代码就将沙箱能力集成到现有的 Agent 框架中。SDK 的核心功能如下:

统一集成接口

  • 提供对 LangChain、AgentScope 等主流框架的开箱即用支持
  • 统一的模型代理接口,简化多模型管理
  • 标准化的工具注册机制

Sandbox 生命周期管理

  • 自动创建和销毁沙箱实例
  • 支持会话级别的状态保持
  • 灵活的资源配置和超时控制

安装 AgentRun SDK

pip install agentrun-sdk[playwright,server]

注意: 确保您的 Python 环境版本在 3.10 及以上。

基本使用示例

以下是使用 AgentRun SDK 创建和管理 Browser Sandbox 的核心代码:

from agentrun.sandbox import Sandbox, TemplateType
from playwright.sync_api import sync_playwright
# 创建 Browser Sandbox
sandbox = Sandbox.create(
    template_type=TemplateType.BROWSER,
    template_name="your-template-name",
    sandbox_idle_timeout_seconds=300
)
# 获取 CDP URL(用于 Playwright 连接)
cdp_url = sandbox.get_cdp_url()
# 使用 Playwright 连接并操作
with sync_playwright() as p:
    browser = p.chromium.connect_over_cdp(cdp_url)
    page = browser.contexts[0].pages[0]
    page.goto("https://www.example.com")
    page.screenshot(path="screenshot.png")
    browser.close()
# 销毁 Sandbox
sandbox.delete()

关键概念:

  • template_name: 控制台创建的浏览器环境模板
  • cdp_url: 用于 Playwright/Puppeteer 连接
  • vnc_url: 用于实时查看浏览器画面(可通过 sandbox.get_cdp_url() 获取)

注意: 由于所有浏览器操作都在云端进行,您无需在本地安装浏览器。Playwright 仅用于通过 CDP 协议连接到云端的浏览器实例。

如何创建 Sandbox 模板

使用 Browser Sandbox 需要新建 Sandbox 模板,您需要访问 AgentRun 控制台网站 [ 1] ,并按照如下步骤创建模板:

  1. 在顶部菜单栏选择“运行时与沙箱”;
  2. 在左侧边栏选择“Sandbox 沙箱”;
  3. 点击右上角“创建沙箱模板”;

image

  1. 选择“浏览器”;

image

  1. 在弹出的抽屉对话框中填写和选择您的模板的规格、网络等配置,并复制模板名称;

image

  1. 点击“创建浏览器”等待其就绪即可。

从零开始用 LangChain 创建 Browser Sandbox 智能体

本教程将指导您从零开始创建一个完整的 Browser Sandbox 智能体项目。

基于 LangChain 集成 Browser Sandbox

本教程将详细讲解如何使用 LangChain 创建 Browser Sandbox 相关的 Tools 并集成到 Agent 中。

项目结构

为了保持代码的内聚性和可维护性,我们将代码拆分为以下模块:

模块职责划分:

sandbox_manager.py:负责 Sandbox 的创建、管理和销毁,提供统一的接口
langchain_agent.py:负责创建 LangChain Tools 和 Agent,集成 VNC 信息
main.py:作为入口文件,演示如何使用上述模块

步骤 1:创建项目并安装依赖

首先创建项目目录(如果还没有):

mkdir -p langchain-demo
cd langchain-demo

创建 requirements.txt 文件,内容如下:

# LangChain 核心库
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
# AgentRun SDK
agentrun-sdk[playwright,server]>=0.0.8
# 浏览器自动化
playwright>=1.40.0
# 环境变量管理
python-dotenv>=1.0.0

然后安装依赖:

pip install -r requirements.txt

主要依赖说明:

  • langchain 和 langchain-openai:LangChain 核心库
  • agentrun-sdk[playwright,server]:AgentRun SDK,用于 Sandbox 管理
  • playwright:浏览器自动化库
  • python-dotenv:环境变量管理

步骤 2:配置环境变量

在项目根目录创建 .env 文件,配置以下环境变量:

# 阿里云百炼平台的 API Key,用于调用大模型能力
# 请前往 https://bailian.console.aliyun.com/?tab=app#/api-key 创建和查看
DASHSCOPE_API_KEY=sk-your-bailian-api-key
# 阿里云账号的访问密钥 ID 和访问密钥 Secret,用于 AgentRun SDK 鉴权
ALIBABA_CLOUD_ACCESS_KEY_ID=your-ak
ALIBABA_CLOUD_ACCESS_KEY_SECRET=your-sk
ALIBABA_CLOUD_ACCOUNT_ID=your-main-account-id
ALIBABA_CLOUD_REGION=cn-hangzhou
# browser sandbox 模板的名称,可以在 https://functionai.console.aliyun.com/cn-hangzhou/agent/runtime/sandbox 控制台创建
BROWSER_TEMPLATE_NAME=sandbox-your-template-name
# agentrun 的控制面和数据面的 API 端点请求地址,默认cn-hangzhou
AGENTRUN_CONTROL_ENDPOINT=agentrun.cn-hangzhou.aliyuncs.com
AGENTRUN_DATA_ENDPOINT=https://${your-main-account-id}.agentrun-data.cn-hangzhou.aliyuncs.com

步骤 3:创建 Sandbox 生命周期管理模块

创建 sandbox_manager.py 文件,负责 Sandbox 的创建、管理和销毁。核心代码如下:

"""
Sandbox 生命周期管理模块
负责 AgentRun Browser Sandbox 的创建、管理和销毁。
提供统一的接口供 LangChain Agent 使用。
"""
import os
from typing import Optional, Dict, Any
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
class SandboxManager:
    """Sandbox 生命周期管理器"""
    def __init__(self):
        self._sandbox: Optional[Any] = None
        self._sandbox_id: Optional[str] = None
        self._cdp_url: Optional[str] = None
        self._vnc_url: Optional[str] = None
    def create(
        self,
        template_name: Optional[str] = None,
        idle_timeout: int = 3000
    ) -> Dict[str, Any]:
        """
        创建或获取一个浏览器 sandbox 实例
        Args:
            template_name: Sandbox 模板名称,如果为 None 则从环境变量读取
            idle_timeout: 空闲超时时间(秒),默认 3000 秒
        Returns:
            dict: 包含 sandbox_id, cdp_url, vnc_url 的字典
        Raises:
            RuntimeError: 创建失败时抛出异常
        """
        try:
            from agentrun.sandbox import Sandbox, TemplateType
            # 如果已有 sandbox,直接返回
            if self._sandbox is not None:
                return self.get_info()
            # 从环境变量获取模板名称
            if template_name is None:
                template_name = os.getenv(
                    "BROWSER_TEMPLATE_NAME",
                    "sandbox-browser-demo"
                )
            # 创建 sandbox
            self._sandbox = Sandbox.create(
                template_type=TemplateType.BROWSER,
                template_name=template_name,
                sandbox_idle_timeout_seconds=idle_timeout
            )
            self._sandbox_id = self._sandbox.sandbox_id
            self._cdp_url = self._get_cdp_url()
            self._vnc_url = self._get_vnc_url()
            return self.get_info()
        except ImportError as e:
            print(e)
            raise RuntimeError(
                "agentrun-sdk 未安装,请运行: pip install agentrun-sdk[playwright,server]"
            )
        except Exception as e:
            raise RuntimeError(f"创建 Sandbox 失败: {str(e)}")
    def get_info(self) -> Dict[str, Any]:
        """
        获取当前 sandbox 的信息
        Returns:
            dict: 包含 sandbox_id, cdp_url, vnc_url 的字典
        Raises:
            RuntimeError: 如果没有活动的 sandbox
        """
        if self._sandbox is None:
            raise RuntimeError("没有活动的 sandbox,请先创建")
        return {
            "sandbox_id": self._sandbox_id,
            "cdp_url": self._cdp_url,
            "vnc_url": self._vnc_url,
        }
    def get_cdp_url(self) -> Optional[str]:
        """获取 CDP URL"""
        return self._sandbox.get_cdp_url()
    def get_vnc_url(self) -> Optional[str]:
        """获取 VNC URL"""
        return self._sandbox.get_vnc_url()
    def get_sandbox_id(self) -> Optional[str]:
        """获取 Sandbox ID"""
        return self._sandbox_id
    def destroy(self) -> str:
        """
        销毁当前的 sandbox 实例
        Returns:
            str: 操作结果描述
        """
        if self._sandbox is None:
            return "没有活动的 sandbox"
        try:
            sandbox_id = self._sandbox_id
            # 尝试销毁 sandbox
            if hasattr(self._sandbox, 'delete'):
                self._sandbox.delete()
            elif hasattr(self._sandbox, 'stop'):
                self._sandbox.stop()
            elif hasattr(self._sandbox, 'destroy'):
                self._sandbox.destroy()
            # 清理状态
            self._sandbox = None
            self._sandbox_id = None
            self._cdp_url = None
            self._vnc_url = None
            return f"Sandbox 已销毁: {sandbox_id}"
        except Exception as e:
            # 即使销毁失败,也清理本地状态
            self._sandbox = None
            self._sandbox_id = None
            self._cdp_url = None
            self._vnc_url = None
            return f"销毁 Sandbox 时出错: {str(e)}"
    def is_active(self) -> bool:
        """检查 sandbox 是否活跃"""
        return self._sandbox is not None
    def __enter__(self):
        """上下文管理器入口"""
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器退出,自动销毁"""
        self.destroy()
        return False
# 全局单例(可选,用于简单场景)
_global_manager: Optional[SandboxManager] = None
def get_global_manager() -> SandboxManager:
    """获取全局 SandboxManager 单例"""
    global _global_manager
    if _global_manager is None:
        _global_manager = SandboxManager()
    return _global_manager
def reset_global_manager():
    """重置全局 SandboxManager"""
    global _global_manager
    if _global_manager:
        _global_manager.destroy()
    _global_manager = None

关键功能:

  1. 创建 Sandbox: 使用 AgentRun SDK 创建浏览器 Sandbox
  2. 获取连接信息: 自动获取 CDP URL 和 VNC URL,支持多种属性名兼容
  3. 生命周期管理: 提供销毁方法,确保资源正确释放

步骤 4:创建 LangChain Tools 和 Agent

创建 langchain_agent.py 文件,定义 LangChain Tools 并创建 Agent。核心代码如下:

"""
LangChain Agent 和 Tools 注册模块
负责创建 LangChain Agent,注册 Sandbox 相关的 tools,并集成 VNC 可视化。
本模块使用 sandbox_manager.py 中封装的 SandboxManager 来管理 sandbox 生命周期。
"""
import os
from dotenv import load_dotenv
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from pydantic import BaseModel, Field
# 导入 sandbox 管理器
from sandbox_manager import SandboxManager
# 加载环境变量
load_dotenv()
# 全局 sandbox 管理器实例(单例模式)
_sandbox_manager: SandboxManager | None = None
def get_sandbox_manager() -> SandboxManager:
    """获取 sandbox 管理器实例(单例模式)"""
    global _sandbox_manager
    if _sandbox_manager is None:
        _sandbox_manager = SandboxManager()
    return _sandbox_manager
# ============ LangChain Tools 定义 ============
@tool
def create_browser_sandbox(
    template_name: str = None,
    idle_timeout: int = 3000
) -> str:
    """创建或获取一个浏览器 sandbox 实例。
    当需要访问网页、执行浏览器操作时,首先需要创建 sandbox。
    创建成功后,会返回 sandbox 信息,包括 VNC URL 用于可视化。
    Args:
        template_name: Sandbox 模板名称,如果不提供则从环境变量 BROWSER_TEMPLATE_NAME 读取
        idle_timeout: 空闲超时时间(秒),默认 3000 秒
    Returns:
        Sandbox 信息字符串,包括 ID、CDP URL、VNC URL
    """
    try:
        manager = get_sandbox_manager()
        # 如果 template_name 为空字符串,转换为 None 以便从环境变量读取
        if template_name == "":
            template_name = None
        info = manager.create(template_name=template_name, idle_timeout=idle_timeout)
        result = f"""✅ Sandbox 创建成功!
📋 Sandbox 信息:
- ID: {info['sandbox_id']}
- CDP URL: {info['cdp_url']}
"""
        vnc_url = info.get('vnc_url')
        if vnc_url:
            result += f"- VNC URL: {vnc_url}\n\n"
            result += "提示: VNC 查看器应该已自动打开,您可以在浏览器中实时查看浏览器操作。"
        else:
            result += "\n警告: 未获取到 VNC URL,可能无法使用可视化功能。"
        return result
    except Exception as e:
        return f" 创建 Sandbox 失败: {str(e)}"
@tool
def get_sandbox_info() -> str:
    """获取当前 sandbox 的详细信息,包括 ID、CDP URL、VNC URL 等。
    当需要查看当前 sandbox 状态或获取 VNC 连接信息时使用此工具。
    Returns:
        Sandbox 信息字符串
    """
    try:
        manager = get_sandbox_manager()
        info = manager.get_info()
        result = f"""📋 当前 Sandbox 信息:
- Sandbox ID: {info['sandbox_id']}
- CDP URL: {info['cdp_url']}
"""
        if info.get('vnc_url'):
            result += f"- VNC URL: {info['vnc_url']}\n\n"
            result += "您可以使用 VNC URL 在浏览器中实时查看操作过程。\n"
            result += "   推荐使用 vnc.html 文件或 noVNC 客户端。"
        return result
    except RuntimeError as e:
        return f" {str(e)}"
    except Exception as e:
        return f" 获取 Sandbox 信息失败: {str(e)}"
class NavigateInput(BaseModel):
    """浏览器导航输入参数"""
    url: str = Field(description="要访问的网页 URL,必须以 http:// 或 https:// 开头")
    wait_until: str = Field(
        default="load",
        description="等待页面加载的状态: load, domcontentloaded, networkidle"
    )
    timeout: int = Field(
        default=30000,
        description="超时时间(毫秒),默认 30000"
    )
@tool(args_schema=NavigateInput)
def navigate_to_url(url: str, wait_until: str = "load", timeout: int = 30000) -> str:
    """使用 sandbox 中的浏览器导航到指定 URL。
    当用户需要访问网页时使用此工具。导航后可以在 VNC 中实时查看页面。
    Args:
        url: 要访问的网页 URL
        wait_until: 等待页面加载的状态(load/domcontentloaded/networkidle)
        timeout: 超时时间(毫秒)
    Returns:
        导航结果描述
    """
    try:
        manager = get_sandbox_manager()
        if not manager.is_active():
            return " 错误: 请先创建 sandbox"
        # 验证 URL
        if not url.startswith(("http://", "https://")):
            return f" 错误: 无效的 URL 格式: {url}"
        cdp_url = manager.get_cdp_url()
        if not cdp_url:
            return " 错误: 无法获取 CDP URL"
        # 使用 Playwright 连接浏览器并导航
        try:
            from playwright.sync_api import sync_playwright
            with sync_playwright() as p:
                browser = p.chromium.connect_over_cdp(cdp_url)
                pages = browser.contexts[0].pages if browser.contexts else []
                if pages:
                    page = pages[0]
                else:
                    page = browser.new_page()
                page.goto(url, wait_until=wait_until, timeout=timeout)
                title = page.title()
                return f"已成功导航到: {url}\n📄 页面标题: {title}\n💡 您可以在 VNC 中查看页面内容。"
        except ImportError:
            return f"导航指令已发送: {url}\n💡 提示: 安装 playwright 以启用实际导航功能 (pip install playwright)"
        except Exception as e:
            return f" 导航失败: {str(e)}"
    except Exception as e:
        return f" 操作失败: {str(e)}"
@tool("browser_screenshot", description="在浏览器 sandbox 中截取当前页面截图")
def take_screenshot(filename: str = "screenshot.png") -> str:
    """截取浏览器当前页面的截图。
    Args:
        filename: 截图文件名,默认 "screenshot.png"
    Returns:
        操作结果
    """
    try:
        manager = get_sandbox_manager()
        if not manager.is_active():
            return " 错误: 请先创建 sandbox"
        cdp_url = manager.get_cdp_url()
        if not cdp_url:
            return " 错误: 无法获取 CDP URL"
        try:
            from playwright.sync_api import sync_playwright
            with sync_playwright() as p:
                browser = p.chromium.connect_over_cdp(cdp_url)
                pages = browser.contexts[0].pages if browser.contexts else []
                if pages:
                    page = pages[0]
                else:
                    return " 错误: 没有打开的页面"
                page.screenshot(path=filename)
                return f"截图已保存: {filename}"
        except ImportError:
            return " 错误: 需要安装 playwright (pip install playwright)"
        except Exception as e:
            return f" 截图失败: {str(e)}"
    except Exception as e:
        return f" 操作失败: {str(e)}"
@tool("destroy_sandbox", description="销毁当前的 sandbox 实例,释放资源。注意:仅在程序退出或明确需要释放资源时使用,不要在一轮对话后销毁。")
def destroy_sandbox() -> str:
    """销毁当前的 sandbox 实例。
    重要提示:此工具应该仅在以下情况使用:
    - 程序即将退出
    - 明确需要释放资源
    - 用户明确要求销毁
    不要在一轮对话完成后就销毁 sandbox,因为 sandbox 可以在多轮对话中复用。
    Returns:
        操作结果
    """
    try:
        manager = get_sandbox_manager()
        result = manager.destroy()
        return result
    except Exception as e:
        return f" 销毁失败: {str(e)}"
# ============ Agent 创建 ============
def create_browser_agent(system_prompt: str = None):
    """
    创建带有 sandbox 工具的 LangChain Agent
    Args:
        system_prompt: 自定义系统提示词,如果为 None 则使用默认提示词
    Returns:
        LangChain Agent 实例
    """
    # 配置 DashScope API
    api_key = os.getenv("DASHSCOPE_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 DASHSCOPE_API_KEY")
    base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    model_name = os.getenv("QWEN_MODEL", "qwen-plus")
    # 创建 LLM
    model = ChatOpenAI(
        model=model_name,
        api_key=api_key,
        base_url=base_url,
        temperature=0.7,
    )
    # 创建工具列表
    tools = [
        create_browser_sandbox,
        get_sandbox_info,
        navigate_to_url,
        take_screenshot,
        destroy_sandbox,
    ]
    # 默认系统提示词
    if system_prompt is None:
        system_prompt = """你是一个浏览器自动化助手,可以使用 sandbox 来访问和操作网页。
当用户需要访问网页时,请按以下步骤操作:
1. 首先创建或获取 sandbox(如果还没有)
2. 使用 navigate_to_url 导航到目标网页
3. 执行用户请求的操作
4. 如果需要,可以截取截图
重要提示:
- 创建 sandbox 后,会返回 VNC URL,用户可以使用它实时查看浏览器操作
- 所有操作都会在 VNC 中实时显示,方便调试和监控
- sandbox 可以在多轮对话中复用,不要在一轮对话完成后就销毁
- 只有在用户明确要求销毁时才使用 destroy_sandbox 工具
- 不要主动建议用户销毁 sandbox,除非用户明确要求
- 请始终用中文回复,确保操作准确、高效。"""
    # 创建 Agent
    agent = create_agent(
        model=model,
        tools=tools,
        system_prompt=system_prompt,
    )
    return agent
def get_available_tools():
    """获取所有可用的工具列表"""
    return [
        create_browser_sandbox,
        get_sandbox_info,
        navigate_to_url,
        take_screenshot,
        destroy_sandbox,
    ]

关键要点:

  1. Tool 定义: 使用 @tool 装饰器定义 LangChain Tools
  2. 类型提示: 所有参数必须有类型提示,用于生成工具 schema
  3. 文档字符串: 详细的文档字符串帮助 LLM 理解何时使用工具**
  4. 单例模式: 使用全局管理器实例确保 Sandbox 在会话中复用**

步骤 5:创建主入口文件

创建 main.py 文件,作为程序入口。核心代码如下:

"""
LangChain + AgentRun Browser Sandbox 集成示例
主入口文件,演示如何使用 LangChain Agent 与 AgentRun Browser Sandbox 集成。
"""
import os
import sys
import signal
import webbrowser
import urllib.parse
import threading
import http.server
import socketserver
from pathlib import Path
from dotenv import load_dotenv
from langchain_agent import create_browser_agent, get_sandbox_manager
# 加载环境变量
load_dotenv()
# 全局 HTTP 服务器实例
_http_server = None
_http_port = 8080
# 全局清理标志,用于防止重复清理
_cleanup_done = False
def start_http_server():
    """启动一个简单的 HTTP 服务器来提供 vnc.html"""
    global _http_server
    if _http_server is not None:
        return _http_port
    try:
        current_dir = Path(__file__).parent.absolute()
        class VNCRequestHandler(http.server.SimpleHTTPRequestHandler):
            def __init__(self, *args, **kwargs):
                super().__init__(*args, directory=str(current_dir), **kwargs)
            def log_message(self, format, *args):
                # 静默日志,避免输出过多信息
                pass
        # 尝试启动服务器
        for port in range(_http_port, _http_port + 10):
            try:
                server = socketserver.TCPServer(("", port), VNCRequestHandler)
                server.allow_reuse_address = True
                # 在后台线程中运行服务器
                def run_server():
                    server.serve_forever()
                thread = threading.Thread(target=run_server, daemon=True)
                thread.start()
                _http_server = server
                return port
            except OSError:
                continue
        return None
    except Exception as e:
        print(f"启动 HTTP 服务器失败: {str(e)}")
        return None
def open_vnc_viewer(vnc_url: str):
    """
    自动打开 VNC 查看器并设置 VNC URL
    Args:
        vnc_url: VNC WebSocket URL
    """
    if not vnc_url:
        return
    try:
        # 获取当前文件所在目录
        current_dir = Path(__file__).parent.absolute()
        vnc_html_path = current_dir / "vnc.html"
        # 检查文件是否存在
        if not vnc_html_path.exists():
            print(f"警告: vnc.html 文件不存在: {vnc_html_path}")
            print_vnc_info(vnc_url)
            return
        # 启动 HTTP 服务器
        port = start_http_server()
        if port:
            # 编码 VNC URL 作为 URL 参数
            encoded_url = urllib.parse.quote(vnc_url, safe='')
            # 构建 HTTP URL
            http_url = f"http://localhost:{port}/vnc.html?url={encoded_url}"
            # 打开浏览器
            print(f"\n正在打开 VNC 查看器...")
            print(f"HTTP 服务器运行在: http://localhost:{port}")
            print(f"VNC URL: {vnc_url[:80]}...")
            print(f"完整 URL: {http_url[:100]}...")
            webbrowser.open(http_url)
            print(f"VNC 查看器已打开")
            print(f"VNC URL 已通过 URL 参数自动设置,页面加载后会自动连接")
        else:
            # 如果 HTTP 服务器启动失败,尝试使用 file:// 协议
            print(f"HTTP 服务器启动失败,尝试使用文件协议...")
            encoded_url = urllib.parse.quote(vnc_url, safe='')
            file_url = f"file://{vnc_html_path}?url={encoded_url}"
            webbrowser.open(file_url)
            print(f"VNC 查看器已打开(使用文件协议)")
            print(f"提示: 如果无法自动连接,请手动复制 VNC URL 到输入框")
    except Exception as e:
        print(f"自动打开 VNC 查看器失败: {str(e)}")
        print_vnc_info(vnc_url)
def print_vnc_info(vnc_url: str):
    """打印 VNC 连接信息"""
    if not vnc_url:
        return
    print("\n" + "=" * 60)
    print("VNC 可视化连接信息")
    print("=" * 60)
    print(f"\nVNC URL: {vnc_url}")
    print("\n使用方式:")
    print("   1. 使用 noVNC 客户端连接")
    print("   2. 或在浏览器中访问 VNC 查看器页面")
    print("   3. 实时查看浏览器操作过程")
    print("\n" + "=" * 60 + "\n")
def cleanup_sandbox():
    """
    清理 sandbox 资源
    这个函数可以被信号处理器、异常处理器和正常退出流程调用
    """
    global _cleanup_done
    # 防止重复清理
    if _cleanup_done:
        return
    _cleanup_done = True
    try:
        manager = get_sandbox_manager()
        if manager.is_active():
            print("\n" + "=" * 60)
            print("正在清理 sandbox...")
            print("=" * 60)
            result = manager.destroy()
            print(f"清理结果: {result}\n")
        else:
            print("\n没有活动的 sandbox 需要清理\n")
    except Exception as e:
        print(f"\n清理 sandbox 时出错: {str(e)}\n")
def signal_handler(signum, frame):
    """
    信号处理器,处理 Ctrl+C (SIGINT) 和其他信号
    Args:
        signum: 信号编号
        frame: 当前堆栈帧
    """
    print("\n\n收到中断信号,正在清理资源...")
    cleanup_sandbox()
    print("清理完成")
    sys.exit(0)
def main():
    """主函数"""
    global _cleanup_done
    # 重置清理标志
    _cleanup_done = False
    # 注册信号处理器,处理 Ctrl+C (SIGINT)
    signal.signal(signal.SIGINT, signal_handler)
    # 在 Windows 上,SIGBREAK 也可以处理
    if hasattr(signal, 'SIGBREAK'):
        signal.signal(signal.SIGBREAK, signal_handler)
    print("=" * 60)
    print("LangChain + AgentRun Browser Sandbox 集成示例")
    print("=" * 60)
    print()
    try:
        # 创建 Agent
        print("正在初始化 LangChain Agent...")
        agent = create_browser_agent()
        print("Agent 初始化完成\n")
        # 示例查询
        queries = [
            "创建一个浏览器 sandbox",
            "获取当前 sandbox 的信息,包括 VNC URL",
            "导航到 https://www.aliyun.com",
            "截取当前页面截图",
        ]
        # 执行查询
        for i, query in enumerate(queries, 1):
            print(f"\n{'=' * 60}")
            print(f"查询 {i}: {query}")
            print(f"{'=' * 60}\n")
            try:
                result = agent.invoke({
                    "messages": [{"role": "user", "content": query}]
                })
                # 提取最后一条消息的内容
                output = result.get("messages", [])[-1].content if isinstance(result.get("messages"), list) else result.get("output", str(result))
                print(f"\n结果:\n{output}\n")
                # 如果是创建 sandbox,自动打开 VNC 查看器
                if i == 1:
                    try:
                        # 等待一下确保 sandbox 完全创建
                        import time
                        time.sleep(1)
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            vnc_url = info.get('vnc_url')
                            if vnc_url:
                                print(f"\n检测到 VNC URL: {vnc_url[:80]}...")
                                open_vnc_viewer(vnc_url)
                                print_vnc_info(vnc_url)
                            else:
                                print("\n警告: 未获取到 VNC URL,请检查 sandbox 创建是否成功")
                    except Exception as e:
                        print(f"打开 VNC 查看器时出错: {str(e)}")
                        import traceback
                        traceback.print_exc()
                # 如果是获取信息,显示 VNC 信息
                elif i == 2:
                    try:
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            if info.get('vnc_url'):
                                print_vnc_info(info['vnc_url'])
                    except:
                        pass
            except Exception as e:
                print(f"查询失败: {str(e)}\n")
                import traceback
                traceback.print_exc()
        # 交互式查询
        print("\n" + "=" * 60)
        print("进入交互模式(输入 'quit' 或 'exit' 退出,Ctrl+C 或 Ctrl+D 中断)")
        print("=" * 60 + "\n")
        while True:
            try:
                user_input = input("请输入您的查询: ").strip()
            except EOFError:
                # 处理 Ctrl+D (EOF)
                print("\n\n检测到输入结束 (Ctrl+D),正在清理资源...")
                cleanup_sandbox()
                print("清理完成")
                break
            except KeyboardInterrupt:
                # 处理 Ctrl+C (在 input 调用期间)
                print("\n\n检测到中断信号 (Ctrl+C),正在清理资源...")
                cleanup_sandbox()
                print("清理完成")
                break
            if not user_input:
                continue
            if user_input.lower() in ['quit', 'exit', '退出']:
                print("\nBye")
                # 退出前清理 sandbox
                cleanup_sandbox()
                break
            try:
                result = agent.invoke({
                    "messages": [{"role": "user", "content": user_input}]
                })
                output = result.get("messages", [])[-1].content if isinstance(result.get("messages"), list) else result.get("output", str(result))
                print(f"\n结果:\n{output}\n")
                # 检查是否需要打开或显示 VNC 信息
                user_input_lower = user_input.lower()
                if "创建" in user_input_lower and "sandbox" in user_input_lower:
                    # 如果是创建 sandbox,自动打开 VNC 查看器
                    try:
                        # 等待一下确保 sandbox 完全创建
                        import time
                        time.sleep(1)
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            vnc_url = info.get('vnc_url')
                            if vnc_url:
                                print(f"\n检测到 VNC URL: {vnc_url[:80]}...")
                                open_vnc_viewer(vnc_url)
                                print_vnc_info(vnc_url)
                            else:
                                print("\n警告: 未获取到 VNC URL,请检查 sandbox 创建是否成功")
                    except Exception as e:
                        print(f"打开 VNC 查看器时出错: {str(e)}")
                        import traceback
                        traceback.print_exc()
                elif "sandbox" in user_input_lower or "vnc" in user_input_lower:
                    # 其他情况只显示信息
                    try:
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            if info.get('vnc_url'):
                                print_vnc_info(info['vnc_url'])
                    except:
                        pass
            except Exception as e:
                print(f"查询失败: {str(e)}\n")
                import traceback
                traceback.print_exc()
        # 清理资源(仅在程序正常退出时)
        cleanup_sandbox()
    except KeyboardInterrupt:
        # 处理顶层 KeyboardInterrupt (Ctrl+C)
        print("\n\n检测到中断信号 (Ctrl+C),正在清理资源...")
        cleanup_sandbox()
        print("清理完成")
        sys.exit(0)
    except EOFError:
        # 处理顶层 EOFError (Ctrl+D)
        print("\n\n检测到输入结束 (Ctrl+D),正在清理资源...")
        cleanup_sandbox()
        print("清理完成")
        sys.exit(0)
    except ValueError as e:
        print(f"配置错误: {str(e)}")
        print("\n提示: 请确保已设置以下环境变量:")
        print("   - DASHSCOPE_API_KEY: DashScope API Key")
        print("   - ALIBABA_CLOUD_ACCOUNT_ID: 阿里云账号 ID")
        print("   - ALIBABA_CLOUD_ACCESS_KEY_ID: 访问密钥 ID")
        print("   - ALIBABA_CLOUD_ACCESS_KEY_SECRET: 访问密钥 Secret")
        print("   - ALIBABA_CLOUD_REGION: 区域(默认: cn-hangzhou)")
    except Exception as e:
        print(f"发生错误: {str(e)}")
        import traceback
        traceback.print_exc()
        # 发生错误时也尝试清理
        cleanup_sandbox()
if __name__ == "__main__":
    main()

关键功能:

  1. VNC 自动打开: 创建 Sandbox 后自动打开 VNC 查看器
  2. 信号处理: 捕获 Ctrl+C,确保资源正确清理
  3. 交互模式: 支持持续对话,复用 Sandbox 实例

VNC 可视化集成

VNC(Virtual Network Computing)功能允许您实时查看和监控浏览器在 Sandbox 中的操作过程,这对于调试和监控 Agent 行为非常有用。

获取 VNC URL:

创建 Sandbox 后,可以通过 get_sandbox_info tool 获取 VNC URL:

# 通过 Agent 调用
result = agent.invoke({
    "messages": [{"role": "user", "content": "获取 sandbox 信息"}]
})
# 或直接通过管理器获取
manager = get_sandbox_manager()
info = manager.get_info()
vnc_url = info['vnc_url']

自动打开 VNC 查看器:

在 main.py 中,我们实现了自动打开 VNC 查看器的功能:

import webbrowser
import urllib.parse
from pathlib import Path
def open_vnc_viewer(vnc_url: str):
    """自动打开 VNC 查看器"""
    current_dir = Path(__file__).parent.absolute()
    vnc_html_path = current_dir / "vnc.html"
    if vnc_html_path.exists():
        # 通过 URL 参数传递 VNC URL
        encoded_url = urllib.parse.quote(vnc_url, safe='')
        file_url = f"file://{vnc_html_path}?url={encoded_url}"
        webbrowser.open(file_url)

VNC HTML 页面:

vnc.html 页面会从 URL 参数中读取 VNC URL,并自动连接到 VNC 服务器。页面包含以下核心功能:

  1. noVNC 库加载: 从 CDN 动态加载 noVNC 客户端库
  2. 自动连接: 读取 URL 参数中的 VNC URL 并自动连接
  3. 状态显示: 显示连接状态(连接中、已连接、已断开)
  4. 手动控制: 支持手动输入 VNC URL、断开重连等操作

核心 JavaScript 代码片段:

// 从 URL 参数获取 VNC URL
const urlParams = new URLSearchParams(window.location.search);
const vncUrl = urlParams.get('url');
// 加载 noVNC 库
async function loadNoVNC() {
    const module = await import('https://cdn.jsdelivr.net/gh/novnc/noVNC@v1.4.0/core/rfb.js');
    return module.default;
}
// 连接 VNC
async function connectVNC(url) {
    const RFB = await loadNoVNC();
    rfb = new RFB(vncScreen, url, {
        shared: true,
        credentials: { password: '' }
    });
    rfb.addEventListener('connect', () => {
        console.log('VNC 连接成功');
    });
}

完整的 vnc.html 文件可以在示例代码仓库中获取。

手动使用 VNC 查看器:

如果自动打开失败,您也可以手动使用 VNC 查看器:

1. 使用 noVNC 在线客户端:

  • 访问 noVNC 在线客户端 [ 2]
  • 在连接设置中填入 VNC URL
  • 点击连接

2. 使用本地 VNC HTML 页面:

  • 打开 vnc.html
  • 输入 VNC URL
  • 点击连接按钮

实时监控功能:

  • 所有浏览器操作都会在 VNC 中实时显示
  • 可以看到 Agent 的每一步操作(导航、点击、输入等)
  • 方便调试和监控 Agent 行为
  • 支持交互式操作(在 VNC 中直接操作浏览器)

运行和测试

python main.py

程序会自动:

  1. 创建 Browser Sandbox
  2. 打开 VNC 查看器(实时查看浏览器操作)
  3. 执行预设查询
  4. 进入交互模式

工作原理

为了更好地理解系统架构,我们将工作流程拆分为两个部分:LangChain Agent 工作流程和 SandboxManager 生命周期管理

1. LangChain Agent 工作流程

下图展示了 LangChain Agent 如何处理用户请求并调用相应的 Tools:

image

Agent 工作流程说明:

  1. 请求接收: 用户发起自然语言请求(如“访问淘宝首页并截图”)
  2. 意图分析: Agent 分析用户意图,决定需要调用哪些 Tools
  3. Tool 调用: 根据任务需求,顺序或组合调用多个 Tools
  4. Manager 交互: 所有 Tools 都通过 SandboxManager 单例实例操作 Sandbox
  5. 结果处理: Agent 将 Tool 返回的结果整合成用户友好的响应
  6. 多轮对话: Sandbox 在整个会话中保持活跃,支持多轮对话

5 个核心 Tools 的职责:

image

2. SandboxManager 生命周期管理

下图展示了 SandboxManager 如何管理 Sandbox 的完整生命周期:

image

SandboxManager 工作流程说明:

1. 单例管理:

  • 首次调用时创建 Manager 实例
  • 后续调用复用同一个实例
  • 确保整个会话只有一个 Sandbox

2. Sandbox 创建:

  • 调用 AgentRun SDK 的 Sandbox.create()
  • SDK 通过阿里云 API 与函数计算 FC 通信
  • FC 服务创建独立的容器实例,包含:
  • Chromium 浏览器 VNC 服务必要的运行环境

3. 连接信息获取:

  • CDP URL: WebSocket 地址,用于 Playwright/Puppeteer 远程控制浏览器
  • VNC URL: WebSocket 地址,用于实时查看浏览器画面**

4. 浏览器操作:

  • Playwright 通过 CDP URL 连接到远程浏览器
  • 执行各种浏览器操作(导航、点击、截图等)
  • VNC 同步显示操作过程,用户可实时监控

5. 资源清理:

  • 调用 destroy() 方法销毁 Sandbox
  • 清理 Manager 内部状态
  • 通过 SDK 释放云端资源

3. Agent 与 Manager 的协作关系

交互模式:

用户请求 → Agent → Tool → SandboxManager → AgentRun SDK → 云端 Sandbox
                                    ↓
用户响应 ← Agent ← Tool ← SandboxManager ← 操作结果

关键设计理念:

  1. 分层架构:
  • 用户层:自然语言交互
  • Agent 层:意图理解和任务分解
  • Tool 层:功能封装和参数验证
  • Manager 层:资源管理和状态维护
  • SDK 层:云服务通信
  • 云端层:实际的 Sandbox 环境
  1. 单例模式:
  • SandboxManager 使用单例模式
  • 保证整个会话中只有一个 Sandbox 实例
  • 避免资源浪费和状态冲突
  1. 状态复用:
  • Sandbox 在多轮对话中保持活跃
  • 减少创建和销毁的开销
  • 提供更流畅的用户体验
  1. 双通道设计:
  • CDP 通道:Agent 通过 Playwright 控制浏览器
  • VNC 通道:用户通过 VNC 查看器实时监控
  1. 解耦设计:
  • Tools 不直接操作 SDK,通过 Manager 统一管理
  • 便于扩展和维护
  • 统一的错误处理和资源管理

典型使用场景示例:

# 第 1 轮对话
用户: "创建一个 sandbox 并访问淘宝首页"
→ Agent 调用: create_browser_sandbox → navigate_to_url
→ Manager: 创建 Sandbox → Playwright 导航
→ 结果: "Sandbox 已创建,已访问淘宝首页"
# 第 2 轮对话(复用 Sandbox)
用户: "截取当前页面"
→ Agent 调用: take_screenshot
→ Manager: 使用现有 Sandbox → Playwright 截图
→ 结果: "截图已保存"
# 第 3 轮对话(复用 Sandbox)
用户: "访问京东首页"
→ Agent 调用: navigate_to_url
→ Manager: 使用现有 Sandbox → Playwright 导航
→ 结果: "已访问京东首页"

通过这种设计,Agent 专注于理解用户意图和任务编排,而 Manager 专注于 Sandbox 的生命周期管理,实现了清晰的职责分离。

工作原理总结:

  1. 工具注册:使用 @tool 装饰器将 Sandbox 功能封装为 LangChain Tools
  2. 生命周期管理: SandboxManager 负责 Sandbox 的创建、管理和销毁
  3. 状态保持:使用单例模式管理 Sandbox 实例,确保同一会话内复用
  4. VNC 集成:自动获取并返回 VNC URL,方便用户实时查看
  5. 错误处理:所有工具都包含完善的错误处理机制

扩展和定制

添加自定义 Tools:

@tool
def extract_table_data(url: str) -> str:
    """从网页中提取表格数据"""
    from playwright.sync_api import sync_playwright
    manager = get_sandbox_manager()
    cdp_url = manager.get_info()['cdp_url']
    with sync_playwright() as p:
        browser = p.chromium.connect_over_cdp(cdp_url)
        page = browser.contexts[0].pages[0]
        page.goto(url)
        tables = page.query_selector_all("table")
        return f"找到 {len(tables)} 个表格"

自定义提示词:

custom_prompt = """你是一个专业的网页数据提取助手。
在执行任务前,请先创建 sandbox,然后使用浏览器工具完成任务。"""
agent = create_browser_agent(system_prompt=custom_prompt)

最佳实践

  1. 模块化设计:将 Sandbox 管理和 Agent 创建分离,提高代码可维护性
  2. 错误处理:所有工具都应包含完善的错误处理
  3. 资源清理:使用信号处理器确保资源正确清理
  4. VNC 提示:在工具返回中包含 VNC URL,方便用户使用
  5. 单例模式:确保 Sandbox 实例在会话中复用,避免重复创建

前端集成可视化监控(VNC)

VNC 集成架构

下图展示了前端如何集成 VNC 实现实时监控:

image

轻量级 HTML 页面集成

创建一个简单的 vnc-viewer.html 文件:

<!DOCTYPE html>
<html>
<head>
    <title>Browser Sandbox VNC 查看器</title>
    <style>
        body { margin: 0; padding: 0; background: 
#000
; }
        
#vnc
-container { width: 100vw; height: 100vh; }
    </style>
</head>
<body>
    <div id="vnc-container"></div>
    <script type="module">
        const params = new URLSearchParams(window.location.search);
        const vncUrl = params.get('url');
        if (!vncUrl) {
            alert('请提供 VNC URL 参数');
        } else {
            const module = await import('https://cdn.jsdelivr.net/gh/novnc/noVNC@v1.4.0/core/rfb.js');
            const RFB = module.default;
            const rfb = new RFB(
                document.getElementById('vnc-container'),
                vncUrl,
                { shared: true, credentials: { password: '' } }
            );
            rfb.scaleViewport = true;
        }
    </script>
</body>
</html>

使用方式:

import webbrowser
import urllib.parse
vnc_url = sandbox.vnc_url
encoded_url = urllib.parse.quote(vnc_url, safe='')
viewer_url = f"file:///path/to/vnc-viewer.html?url={encoded_url}"
webbrowser.open(viewer_url)

React 应用集成

核心组件代码:

import React, { useEffect, useRef } from 'react';
interface VNCViewerProps {
  vncUrl: string;
  onConnect?: () => void;
  onDisconnect?: () => void;
}
export const VNCViewer: React.FC<VNCViewerProps> = ({ 
  vncUrl, 
  onConnect, 
  onDisconnect 
}) => {
  const containerRef = useRef<HTMLDivElement>(null);
  useEffect(() => {
    let rfb: any;
    const initVNC = async () => {
      if (!containerRef.current || !vncUrl) return;
      const { default: RFB } = await import('@novnc/novnc/core/rfb');
      rfb = new RFB(containerRef.current, vncUrl, {
        shared: true,
        credentials: { password: '' }
      });
      rfb.scaleViewport = true;
      rfb.addEventListener('connect', () => onConnect?.());
      rfb.addEventListener('disconnect', () => onDisconnect?.());
    };
    initVNC();
    return () => {
      if (rfb) rfb.disconnect();
    };
  }, [vncUrl, onConnect, onDisconnect]);
  return (
    <div 
      ref={containerRef} 
      style={{ width: '100%', height: '600px', background: '
#000
' }} 
    />
  );
};

使用示例:

import React, { useState, useEffect } from 'react';
import { VNCViewer } from './VNCViewer';
function App() {
  const [vncUrl, setVncUrl] = useState<string>('');
  useEffect(() => {
    fetch('/api/sandbox/create', { method: 'POST' })
      .then(res => res.json())
      .then(data => setVncUrl(data.vnc_url));
  }, []);
  return (
    <div>
      <h1>Browser Sandbox 实时监控</h1>
      {vncUrl ? (
        <VNCViewer 
          vncUrl={vncUrl}
          onConnect={() => console.log('已连接')}
          onDisconnect={() => console.log('已断开')}
        />
      ) : (
        <p>正在初始化...</p>
      )}
    </div>
  );
}

Puppeteer 和 Playwright 直接集成

如果您更熟悉传统的浏览器自动化库,也可以直接使用 Puppeteer 或 Playwright 连接到 Browser Sandbox。

使用 Playwright

from playwright.sync_api import sync_playwright
from agentrun.sandbox import Sandbox, TemplateType
# 创建 Sandbox
sandbox = Sandbox.create(
    template_type=TemplateType.BROWSER,
    template_name="your-template-name",
    sandbox_idle_timeout_seconds=3000
)
# 使用 Playwright 连接
with sync_playwright() as p:
    browser = p.chromium.connect_over_cdp(sandbox.cdp_url)
    page = browser.contexts[0].pages[0]
    # 执行操作
    page.goto("https://www.example.com")
    page.screenshot(path="screenshot.png")
    content = page.content()
    browser.close()
# 清理
sandbox.delete()

使用 Puppeteer(Node.js)

const puppeteer = require('puppeteer-core');
// CDP URL 从 Sandbox 获取
const cdpUrl = 'wss://your-account.funagent-data-pre.cn-hangzhou.aliyuncs.com/sandboxes/xxx/ws/automation';
(async () => {
  const browser = await puppeteer.connect({
    browserWSEndpoint: cdpUrl,
    defaultViewport: null
  });
  const page = (await browser.pages())[0];
  await page.goto('https://www.example.com');
  await page.screenshot({ path: 'screenshot.png' });
  await browser.close();
})();

总结

通过本教程,您已经学会了:

  1. AgentRun SDK 基础: 如何使用 SDK 创建和管理 Browser Sandbox
  2. LangChain 集成: 如何将 Sandbox 封装为 LangChain Tools
  3. VNC 可视化: 如何在前端集成 VNC 实现实时监控
  4. 直接集成: 如何使用 Puppeteer/Playwright 直接连接 Sandbox

相关链接:

[1] Agentrun 控制台网站

https://functionai.console.aliyun.com/cn-hangzhou/agent/runti...

[2] noVNC 在线客户端

https://novnc.com/noVNC/vnc.html

当项目中的接口测试用例和测试场景越积越多,单独管理和执行它们的成本会急剧上升。原本用于保障质量的自动化测试,自身反而成了维护的负担。

传统的维护方式是手动点选。当项目沉淀了大量用例和测试场景时,手动核对哪些该入库、哪些该回归,会成为沉重的体力成本。

Apifox「测试套件」通过动态模式解决了这个问题。它不再死板地记录 ID,而是保存一套筛选规则,例如按目录、标签、优先级等条件进行组合筛选。

在每次运行前,套件会根据筛选规则,自动组合所有符合规则的用例和测试场景。这意味着你只需专注于测试内容的编写和打标,新增的测试资产就会自动进入 CI/CD 流水线,真正实现无人值守的持续集成。

最终,所有执行项的结果会被汇总到一份聚合报告中,便于集中分析和定位问题。

创建并编排你的第一个套件

将 Apifox 更新到最新版本后,在「自动化测试」模块中,可以找到「测试套件」的分类。点击其右侧的 ... 按钮,选择「新建测试套件」。

在弹出的窗口中输入一个描述性的名称,配置相关的优先级或者标签,一个空的测试套件就创建完成了。

创建完成后,核心工作是向这个套件中添加内容。测试套件的内容可以是单个的「接口测试用例」,也可以是包含多个步骤的「测试场景」。

添加测试内容:静态与动态

点击「添加接口测试用例」或「添加测试场景」时,会看到「静态」和「动态」两种模式的选项。这两种模式决定了测试套件如何管理其包含的测试项,适用于不同的维护策略和测试目标。

静态模式,顾名思义,是精确地、不变地指定要执行的测试项。当你以静态模式勾选某些用例时,系统记录的是这些用例的唯一 ID。即使后续这些用例的源目录增加了新的用例,或者用例本身被移动,这个套件的执行范围也不会改变。它的确定性很高,确保了每次运行的内容完全一致。

动态模式则完全不同。它不记录具体的用例 ID,而是保存一套 “筛选规则”,例如 “某个目录下的所有用例” 或 “所有标签为「语义合法」的用例”。

又或者是 “所有标记为 P0 优先级的测试场景”。

在动态模式下,每次运行测试套件时,系统都会根据这套规则重新扫描整个项目,将所有当前符合条件的用例动态地纳入执行计划。这意味着,只要测试用例的属性(如所在目录、标签、优先级)符合规则,它就会被自动包含进来。

静态模式与动态模式:如何选择?

这两种模式没有绝对的优劣之分,而是服务于不同的管理需求。选择哪种模式,取决于你希望测试套件具备怎样的维护特性。

对于需要严格控制范围的专项测试,静态模式更可靠。而对于需要持续迭代、自动纳新的回归或冒烟测试,动态模式则能极大地降低维护成本。

为了更清晰地理解两种模式的差异,可以通过下表进行对比:

执行顺序与高级配置

添加完测试内容后,可以在编排列表中通过拖拽调整它们的执行顺序。

在执行项(测试场景)的右侧,可以对套件的运行行为进行更细粒度的控制。

例如,「遇到错误时」 选项可以决定当某个步骤失败后是继续执行、跳过当前轮次还是立即终止整个运行。「循环次数」则可以将整个套件重复执行多次,用于简单的稳定性测试。这些配置让测试套件不仅仅是一个用例的集合,更是一个可控的执行流程。

运行测试套件

构建好测试套件后,下一步就是执行它。Apifox 提供了从本地手动运行到云端自动化执行的多种方式,以适应不同阶段和环境的需求。

本地可视化运行

最直接的运行方式是在 Apifox 客户端界面中,点击「运行」按钮。这种方式会从本地机器发起请求,适用于在开发和调试阶段进行小规模、快速的测试验证。在运行配置界面,可以临时切换「运行环境」,或设置在运行结束后发送通知。

运行完成后,Apifox 会生成一份本次执行的测试报告,并在界面中以可视化方式展示。报告中会按执行顺序列出每一个接口测试用例和测试场景的结果,清晰标识成功和失败状态,点击具体测试项可查看更详细的报告。

通过 CLI 运行

当测试规模较大,或者需要在无图形界面的服务器上执行时,Apifox CLI 是更高效的选择。它是一个命令行工具,可以将 Apifox 中的测试能力延展到任何终端环境。

要使用 CLI 运行,首先需要安装 Apifox CLI,并确保其版本为最新。完成安装或升级后,可以在测试套件的「CI/CD」标签页中找到自动生成的命令行:

将这条命令复制到终端中执行,即可在命令行看到与图形界面一致的测试过程和结果。

运行结束后,它还会在当前目录下生成一个 apifox-reports/ 文件夹,里面包含了 HTML 格式的详细测试报告。

通过 CLI 运行的方式是实现 CI/CD 的基础。可以将这条命令集成到 Jenkins、GitLab CI 或 GitHub Actions 的脚本中,在代码合并等关键节点自动触发回归测试。

通过定时任务运行

Apifox 内置了「定时任务」功能。在测试套件的「定时任务」标签页,可以新建一个任务,设置其运行周期和运行环境。

与本地运行不同,定时任务需要指定在「自托管 Runner」上执行。

Runner 是一个可以由团队自行部署在内网服务器上的轻量级执行程序。使用 Runner 可以解决本地机器关机或网络不通导致定时任务失败的问题,并利用服务器更强大的计算资源来执行大规模测试。

设置好定时任务后,Apifox 会在指定时间自动调度 Runner 执行测试套件,并将运行历史和报告上传至云端。同时,可以配置失败通知,一旦线上接口出现异常,相关人员就能第一时间收到告警信息,及时介入处理。

总结

通过静态与动态两种编排模式,你既可以精确控制专项测试的执行范围,也能让回归测试随业务迭代自动更新,无需反复手动维护。配合本地运行、CLI 集成和定时任务等多种执行方式,测试套件可以灵活嵌入开发流程的各个环节——从开发阶段的快速验证,到 CI/CD 流水线的自动化回归,再到生产环境的定时巡检。

更多关于测试套件的知识可以前往 Apifox 帮助文档查看。现在就去试试创建你的第一个测试套件,将现有测试内容进行编排,逐步构建可持续运行的自动化回归体系。

不太懂 openspec ,最近了解了一下。自己在某个仓库使用 openspec 写了一次需求,也观察了一下 openspec 自身是怎么使用的,发现 openspec 的 spec 似乎就是在用自然语言描述代码逻辑行为的各种 case 。以 openspec 仓库自身的 spec 为例,发现 spec.md 文件本身是在描述代码的行为逻辑。比如 openspec 的某个 spec.md 对应行为就是这个文件: https://github.com/Fission-AI/OpenSpec/blob/8332a098118a6584a7104ccfe8e46669a1c24b7d/src/utils/change-utils.ts#L112spec.md 本身贴在末尾

我的问题是:

  1. 这样的 spec 存下来有什么意义?因为我理解存下来是为了后续有其他需求迭代时给 ai 看的,那为什么不直接让 ai 去读代码来理解现有的逻辑呢?我理解大型项目让 ai 工作是需要知识库的,但是 openspec 的 spec 更像一个细节说明书,而不是类似纲领的知识库。是不是说在 openspec 的工作流里面 spec 才是代码仓库的行为核心准则,理论上基于 spec.md 可以随时生成一套具体实现可能不一致,但行为一致的代码。
  2. openspec 的工作流程是先让 AI 进行 plan ,然后迭代 plan ,直到 AI 给出 plan 满意了,然后 AI 开始进行 coding 。这个 plan 的流程现在 antigravity 等也能做到。感觉 plan 并不是 openspec 的重点,spec.md 才是重点是吗?如果说期望的流程是先和 ai 讨论出充满细节的 spec ,再让 ai 开始 coding ,有种变成了自然语言描述写代码的感觉,这个感觉非常怪。
  3. 基于 1 的末尾提出的“spec 才是代码仓库的行为核心准则”的想法,假设需要落地到一个前端项目,那按照这个 spec 粒度,我感觉每个 tsx 文件都需要一个 spec 去描述它的规范行为。这个感觉就更怪了

有没有实践比较多的朋友能给一些输入,分享一些经验,或者思考?

附上的 spec.md

change-creation 规范

目的( Purpose )

提供用于以编程方式创建和校验 OpenSpec change 目录的工具函数。


需求( Requirements )

需求:Change 创建( Change Creation )

系统 必须( SHALL ) 提供一个函数,用于以编程方式创建新的 change 目录。

场景:创建 change

  • 当( WHEN ) 调用 createChange(projectRoot, 'add-auth')
  • 那么( THEN ) 系统会创建 openspec/changes/add-auth/ 目录

场景:拒绝重复的 change

  • 当( WHEN ) 调用 createChange(projectRoot, 'add-auth'),且 openspec/changes/add-auth/ 已存在
  • 那么( THEN ) 系统抛出一个错误,表明该 change 已存在

场景:必要时创建父目录

  • 当( WHEN ) 调用 createChange(projectRoot, 'add-auth'),且 openspec/changes/ 不存在
  • 那么( THEN ) 系统创建完整路径,包括所有必要的父目录

场景:拒绝非法的 change 名称

  • 当( WHEN ) 使用非法名称调用 createChange(projectRoot, 'Add Auth')
  • 那么( THEN ) 系统抛出一个校验错误


需求:Change 名称校验( Change Name Validation )

系统 必须( SHALL ) 校验 change 名称符合 kebab-case 规范。

场景:合法的 kebab-case 名称被接受

  • 当( WHEN ) 校验一个类似 add-user-auth 的名称
  • 那么( THEN ) 校验返回 { valid: true }

场景:允许数字后缀

  • 当( WHEN ) 校验一个类似 add-feature-2 的名称
  • 那么( THEN ) 校验返回 { valid: true }

场景:允许单个单词

  • 当( WHEN ) 校验一个类似 refactor 的名称
  • 那么( THEN ) 校验返回 { valid: true }

场景:拒绝大写字母

  • 当( WHEN ) 校验一个类似 Add-Auth 的名称
  • 那么( THEN ) 校验返回 { valid: false, error: "..." }

场景:拒绝空格

  • 当( WHEN ) 校验一个类似 add auth 的名称
  • 那么( THEN ) 校验返回 { valid: false, error: "..." }

场景:拒绝下划线

  • 当( WHEN ) 校验一个类似 add_auth 的名称
  • 那么( THEN ) 校验返回 { valid: false, error: "..." }

场景:拒绝特殊字符

  • 当( WHEN ) 校验一个类似 add-auth! 的名称
  • 那么( THEN ) 校验返回 { valid: false, error: "..." }

场景:拒绝以连字符开头

  • 当( WHEN ) 校验一个类似 -add-auth 的名称
  • 那么( THEN ) 校验返回 { valid: false, error: "..." }

场景:拒绝以连字符结尾

  • 当( WHEN ) 校验一个类似 add-auth- 的名称
  • 那么( THEN ) 校验返回 { valid: false, error: "..." }

场景:拒绝连续连字符

  • 当( WHEN ) 校验一个类似 add--auth 的名称
  • 那么( THEN ) 校验返回 { valid: false, error: "..." }

一、背景

得物经过10年发展,计算任务已超10万+,数据已经超200+PB,为了降低成本,计算引擎和存储资源需要从云平台迁移到得物自建平台,计算引擎从云平台Spark迁移到自建Apache Spark集群、存储从ODPS迁移到OSS。

在迁移时,最关键的一点是需要保证迁移前后数据的一致性,同时为了更加高效地完成迁移工作(目前计算任务已超10万+,手动比数已是不可能),因此比数平台便应运而生。

二、数据比对关键挑战与目标

关键挑战一:如何更快地完成全文数据比对

现状痛点:

在前期迁移过程中,迁移同学需要手动join两张表来识别不一致数据,然后逐条、逐字段进行人工比对验证。这种方式在任务量较少时尚可应付,但当任务规模达到成千上万级别时,就无法实现并发快速分析。

核心问题:

  • 效率瓶颈:每天需要完成数千任务的比对,累计待迁移任务达10万+,涉及表数十万张。
  • 扩展性不足:传统人工比对方式无法满足大规模并发处理需求。

关键挑战二:如何精准定位异常数据

现状痛点:

迁移同学在识别出不一致数据后,需要通过肉眼观察来定位具体问题,经常导致视觉疲劳和分析效率低下。

核心问题:

  • 分析困难:在比对不通过的情况下,比对人员需要人工分析失败原因。
  • 复杂度高:面对数据量庞大、加工逻辑复杂的场景,特别是在处理大JSON数据时,肉眼根本无法有效分辨差异。
  • 耗时严重:单次比对不通过场景的平均分析时间高达1.67小时/任务。

比数核心目标

基于以上挑战,数据比对系统需要实现以下核心目标:

  • 高并发处理能力:支持每天数千任务的快速比对,能够处理10万+待迁移任务和数十万张表的规模。
  • 自动化比对机制:实现全自动化的数据比对流程,减少人工干预,提升比对效率。
  • 智能差异定位:提供精准的差异定位能力,能够快速识别并高亮显示不一致的字段和数据。
  • 可视化分析界面:构建友好的可视化分析平台,支持大JSON数据的结构化展示和差异高亮。
  • 性能优化:将用户单次比对分析时间从小时级大幅缩短至分钟级别。
  • 可扩展架构:设计可水平扩展的系统架构,能够随着业务增长灵活扩容。

三、解决方案实现原理

快速完成全文数据比对方法

比数方法调研

待比对两表数据大小:300GB,计算资源:1000c


经过调研分析比数平台采用第二种和第三种相结合的方式进行比数。

先Union再分组数据一致性校验原理

假如我们有如下a和b两表张需要进行数据比对

表a:


表b:


表行数比较:

select count(1) from a ;
select count(1) from b ;

针对上面的查询结果,如果数量不一致则退出比对,待修复后重新比数;数量一致则继续字段值比较。

字段值比较:

第一步:union a 和 b

select 1 as _t1_count, 0 as _t2_count, `id`, `name`, `age`, `score`
from a
union all
select 0 as _t1_count, 1 as _t2_count, `id`, `name`, `age`, `score`
from b

第二步:sum(_t1_count),sum(_t2_count) 后分组

select sum(_t1_count) as sum_t1_count, sum(_t2_count) as sum_t2_count, `id`, `name`, `age`, `score`
from (
select 1 as _t1_count, 0 as _t2_count, `id`, `name`, `age`, `score`
from a
union all
select 0 as _t1_count, 1 as _t2_count, `id`, `name`, `age`, `score`
from b
) as union_table
group by `id`, `name`, `age`, `score`


第三步:把不一致数据写入新的表中(即上面表中sum_t1_count和sum_t2_count不相等的数据)

drop table if exists a_b_diff_20240908;
create table a_b_diff_20240908 as select * from (
select sum(_t1_count) as sum_t1_count, sum(_t2_count) as sum_t2_count, `id`, `name`, `age`, `score`
from (
select 1 as _t1_count, 0 as _t2_count, `id`, `name`, `age`, `score`
from a
union all
select 0 as _t1_count, 1 as _t2_count, `id`, `name`, `age`, `score`
from b
) as union_table
group by `id`, `name`, `age`, `score`
having sum(_t1_count) <> sum(_t2_count)
) as tmp

如果a_b_diff_20240908没有数据则两张表没有差异,比数通过,如有差异如下:

第四步:读取不一致记录表,根据主键(比如id)找出不一致字段并写到结果表中。

第五步:针对不一致字段的数据进行根因分析,如 json 、数组顺序问题、浮点数精度问题等,给出不一致具体原因。

哈希值聚合实现高效一致性校验

针对上面union后sum 再 group by 方式 在数据量大的时候还是非常耗资源和时间的,考虑到比数任务毕竟有70%都是一致的,所以我们可以先采用哈希值聚合比较两表的的值是否一致,使用这种高效的方法先把两表数据一致的任务过滤掉,剩下的再采用上面方法继续比较,因为还要找出是哪个字段哪里不一致。原理如下:

SELECT count (*),SUM(xxhash64(cloum1)^xxhash64(cloum2)^...) FROM tableA 
EXCEPT 
SELECT count(*),SUM(xxhash64(cloum1)^xxhash64(cloum2)^...) FROM tableB

如果有记录为空说明数据一致,不为空说明数据不一致需要采用上面提到union 分组的方法去找出具体字段哪里不一样。

通过哈希值聚合,单个任务比数时间从500s降低到160s,节省大约70%的时间。

找到两张表不一致数据后需要对两张的数据进行分析确定不一致的点在哪里?这里就需要知道表的主键,根据主键逐个比对两张表的其他字段,因此系统会先进行主键的自动探查,以及无主键的兜底处理。

精准定位异常数据实现方法

自动探查主键:实现原理如下

刚开始我们采用的前5个字段找主键的方式,如下:

针对表a的前5个字段 循环比对
select count(distinct id) from a 与 select count(1) from a 比较 ,如相等主键为id ,不相等继续往下执行
select count(distinct id,name) from a 与 select count(1) from a比较,如相等主键为id,name ,不相等继续往下执行
select count(distinct id,name,age) from a 与 select count(1) from a比较,如相等主键为id,name,age ,不相等继续往下执行,直到循环结束

采用上面的方法不一致任务中大约有49.6%任务自动探查主键失败:因此需重点提升主键识别能力。

针对以上主键探查成功率低的问题,后续进行了一些迭代,优化后的主键探查流程如下:

一、先采用sum(hash)高效计算方式进行探查:

1.先算出两张表每个字段的sum(hash)值  。

select sum(hash(id)),sum(hash(name)),sum(hash(age)),sum(hash(score)) from a 
union all 
select sum(hash(id)),sum(hash(name)),sum(hash(age)),sum(hash(score)) from b;

2.找出值相等的所有字段,本案例中为 id, name。

3.对id,name 可能是主键进一步确认,先进行行数校验,如 select count(distinct id,name) from a 的值等于select count(1) from a 则进一步校验,否则进入到第二种探查主键方式。

4.唯一性验证,如果值为0则表示探查主键成功,否则进入到第二种探查主键方式。

slect count(*) from ((select id,name from a ) expect (select id,name from b))

二、传统distinct方式探查:

针对表a的前N(所有字段数/2或者前N、后N等)个字段 循环比对:

1.select count(distinct id) from a与select count(1) from a比较 ,如相等主键为id ,不相等继续往下执行。

2.select count(distinct id,name) from a 与 select count(1) from a比较,如相等主键为id,name ,不相等继续往下执行。

3.select count(distinct id,name,age) from a 与 select count(1) from a比较,如相等主键为id,name,age ,不相等继续往下执行,直到循环结束。

三、全字段排序模拟:

如果上面两种方式还是没有找到主键则把不一致记录表进行全字段排序然后对第一条和第二条记录挨个字段进行分析,找出不一致内容,示例如下:

slect * from a_b_diff_20240908 order by id,name,age,score asc limit 10;


通过以上结果表可以得出两表的age字段不一致 ,score不一致(但按key排序后一致)。

如果以上自动化分析还是找不到不一致字段内容,可以人工确认表的主键后到平台手动指定主键字段,然后点击后续分析即可按指定主键去找字段不一致内容。

通过多次迭代优化找主键策略,找主键成功率从最初的50.4%提升到75%,加上全字段order by排序后最前两条数据进行分析,相当于可以把找主键的成功率提升到90%以上。

根因分析:实现原理如下

当数据不一致时,平台会根据主键找出两个表哪些字段数据不一致并进行分析,具体如下:

  • 精准定位: 明确指出哪条记录、哪个字段存在差异,并展示具体的源数据和目标数据值。
  • 智能根因分析: 内置了多种差异模式识别规则,能够自动分析并提示不一致的可能原因,例如:
  • 精度问题:如浮点数计算1.0000000001与1.0的差异。
  • JSON序列化差异:如{"a":1, "b":2}与{"b":2, "a":1},在语义一致的情况下,因键值对顺序不同而被标记为差异。同时系统会提示排序后一致。
  • 空值处理差异:如NULL值与空字符串""的差异判定。
  • 日期时区转换问题:时间戳在不同时区下表示不同。

  • 比对结果统计: 提供总数据量、一致数据量、不一致数据量及不一致率百分比,为项目决策提供清晰的量化依据。
  • 比数人员根据平台分析的差异原因,决定是否手动标记通过或进行任务修复。
  • 效果展示:

四、比数平台功能介绍

数据比对基本流程

任务生成:三种比对模式

  • 两表比对: 最直接的比对方式。用户只需指定源表与目标表,平台即可启动全量数据比对。它适用于临时比对的场景。
  • 任务节点比对: 一个任务可能输出多个表,逐一配置这些表的比对任务繁琐且易遗漏,任务节点比对模式完美解决了这一问题。用户只需提供任务节点ID,平台便会自动解析该节点对应的SQL代码,提取出所有输出表,并自动生成比对任务,极大地提升任务迁移比对效率。
  • SQL查询比对: 业务在进行SDK迁移只关心某些查询在迁移后数据是否一样,因此需要对用户提交的所有查询SQL进行比对,平台会分别在ODPS和Spark引擎上执行该查询,将结果集导出到两张临时表,再生成比对任务。

前置校验:提前发现问题

在启动耗时的全量比对之前,需要对任务进行前置校验,确保比对是在表结构一致、集群环境正常的情况下进行,否则一旦启动比数会占用大量计算资源,最后结果还是比数不通过,会影响比数平台整体的运行效率。因此比数平台一般会针对如下问题进行前置拦截。

  • 元数据一致性校验: 比对双方的字段名、字段类型、字段顺序、字段个数是否一致。
  • 函数缺失校验: 针对Spark引擎,校验SQL中使用的函数是否存在、是否能被正确识别,避免因函数不支持而导致的比对失败。
  • 语法问题校验: 分析SQL语句的语法结构,确保其在目标引擎中能够被顺利解析,避免使用了某些特定写法会导致数据出现不一致情况,提前发现语法层面问题,并对任务进行改写。

更多校验点如下:




通过增加以上前置校验拦截,比数任务数从每天3000+下降到1500+, 减少50% 的无效比数,其中UDF缺失最多,有效拦截任务1238,缺少函数87个(帮比数同学快速定位,一次性解决函数缺失问题,避免多次找引擎同学陆陆续续添加,节省双方时间成本)。

破解比数瓶颈:资源分配与任务调度优化

由于比数平台刚上线的时候只有计算迁移团队在使用,后面随着更多的团队开始使用,性能遇到了如下瓶颈:

1.资源不足问题: 不同业务(计算迁移、存储迁移、SDK迁移)的任务相互影响,基本比数任务与根因分析任务相互抢占资源。

2.任务编排不合理: 没有优先级导致大任务阻塞整体比数进程。

3.引擎参数设置不合理: 并行度不够、数据分块大小等高级参数。

针对以上问题比数平台进行了如下优化:

  • 按不同业务拆分成多个队列来运行,保证各个业务之间的比数任务可以同时进行,不会相互影响。
  • 根因分析使用单独的队列,与数据比对任务的队列分开,避免相互抢占资源发生“死锁”。
  • 相同业务内部按批次分时段、分优先级运行,保障重要任务优先进行比对。
  • 针对Spark引擎默认调优了公共参数、并支持用户自主设置其他高级参数。

通过以上优化达到到了如下效果:

  • 比数任务从每天22点完成提前至18点前,同时支持比数同学自主控制高优任务优先执行,方便比数同学及时处理不一致任务。
  • 通过优化资源队列使用方式,使系统找不到主键辅助用户自主找主键接口响应时间从58.5秒降到 26.2秒。

五、比数平台收益分享

平台持续安全运行500+天,每日可完成2000+任务比对,有效比数128万+次,0误判。

  • 助力计算迁移团队节省45+人日/月,完成数据分析、离线数仓空间任务的比对、交割。
  • 助力存储迁移团队完成20%+存储数据的迁移。
  • 助力引擎团队完成800+批次任务的回归验证,确保每一次引擎发布的安全及高效。
  • 助力SDK迁移团队完成80%+应用的迁移。

六、未来演进方向

接下来,平台计划在以下方面持续改进:

智能分析引擎: 针对Json复杂嵌套类型的字段接入大模型进行数据根因分析,找出不一致内容。

比对策略优化: 针对大表自动切分进行比对,降低比数过程出现因数据量大导致异常,进一步提升比对效率。

通用方案沉淀: 将典型的比对场景和解决方案能用化,应用到更多场景及团队中去。

七、结语

比数平台是得物在迁移过程中,为了应对海量任务、大数据量、字段内容复杂多样、异常数据难定位等挑战,确保业务迁移后数据准确而专门提供的解决方案,未来它不单纯是一个服务计算迁移、存储迁移、SDK迁移、Spark版本升级等需要的数据比对工具,而是演进为数据平台中不可或缺的基础设施。

往期回顾

1.得物App智能巡检技术的探索与实践

2.深度实践:得物算法域全景可观测性从 0 到 1 的演进之路 

3.前端平台大仓应用稳定性治理之路|得物技术

4.RocketMQ高性能揭秘:承载万亿级流量的架构奥秘|得物技术

5.PAG在得物社区S级活动的落地

文 /Galaxy平台

关注得物技术,每周更新技术干货

要是觉得文章对你有帮助的话,欢迎评论转发点赞~

未经得物技术许可严禁转载,否则依法追究法律责任。

AI生成代码质量难以把控!本文分享来自美团的技术实践,三大策略破解AI编程痛点。单测快速验证逻辑正确性,安全网保护存量代码演进,TDD模式精准传递需求。告别「看起来没问题」的错觉,构建AI时代的代码质量保障体系。

一、引言

目前,国内外很多AI Coding助手能在几秒钟内生成完整代码块,大大提升了开发效率,但这种高速开发模式也带来了潜在风险——与人工编码不同是,AI Coding助手生成代码存在两个特殊风险:其一,AI Coding助手依赖于上下文与模型自身的能力,输出的代码质量相对不可控。其二,AI生成的代码虽然逻辑通顺、结构完整,但可能隐藏着难以察觉的边界问题或逻辑缺陷。

核心问题:我们如何快速的验证AI生成代码的质量和可靠性?

本文旨在分享如何借助单元测试,让AI编程合作更高效可靠,主要解决三个常见痛点:

  1. 肉眼审查困境:AI一次性生成大量代码时,难以快速准确判断逻辑完备性;
  2. 存量代码信任危机:如何验证AI修改老代码时,不会产生非预期的结果;
  3. 需求传达难题:如何精准向AI表达复杂需求并快速验证。

针对上述三个常见痛点,本文提出采用不同的单元测试策略来应对以上问题。每个策略都针对一个特定痛点设计:策略一通过测试解决肉眼审查的局限性;策略二构建单测安全网应对存量代码的信任问题;策略三则采用TDD模式优化需求传达与验证流程。下文将依次展开说明,希望能对大家有所帮助或启发。

二、策略一:单测检验AI代码逻辑正确性

2.1 问题背景

传统的人工代码审查在AI生成的大量代码面前显得低效且不可靠。在软件测试实践中,有着测试左移(Shift Left Testing)的概念,本质上是借助工具和测试手段更早地发现问题和预防问题。在AI Coding时代,这一理念尤为关键:跳过单元测试直接集成测试看似”抄近路”,实则是将风险后置——开发阶段几分钟能发现的Bug,在集成测试环境可能需要较长定位修复,这中间包含了代码部署、环境准备、测试条件的准备、问题定位、开发人员修复、再次部署验证等一系列漫长的环节。

相比之下,单元测试具有独特的优势:它能够独立运行、快速验证结果,并且可以无限次重复执行。这种测试方式就像是为项目进行的一次性投资,却能为整个开发周期构建起一张可靠的“安全网”。它不仅能实时验证AI Coding生成的代码是否正确,更能持续保障未来代码的质量稳定性,让开发团队始终对代码库保持信心。

2.2 案例:分页查询接口的隐蔽Bug

任务背景:实现一个支持多条件筛选的复杂分页查询接口pageQueryRobot

AI生成了如下核心查询逻辑:

public List<AgentRobotE> pageQueryRobotsByCondition(List<Long> shopIds, String chatSceneCode,
        Boolean enabled, Integer pageNo, Integer pageSize) {
    // ... 前置校验代码 ...

    // 分页查询机器人基础信息
    int offset = (pageNo - 1) * pageSize;
    List<AgentRobotEntity> entities = robotIds.stream()
            .skip(offset)
            .limit(pageSize)
            .map(robotId -> agentRobotDAO.getRobotById(robotId, false))
            .filter(Objects::nonNull)
            // 问题代码:类型不匹配的隐蔽Bug
            .filter(entity -> enabled == null || Objects.equals(entity.getEnabled(), enabled ? 1 : 0))
            .filter(entity -> Objects.equals(entity.getChatSceneCode(), chatSceneCode))
            .collect(Collectors.toList());

    return entities.stream()
            .map(this::convertToModel)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
}

问题分析:这段代码看起来逻辑完整,但第8行的过滤逻辑包含了多个复杂元素:

  • 三元运算符 enabled ? 1 : 0
  • Objects.equals 的使用
  • Boolean到Integer的隐式逻辑转换

仅凭肉眼很难发现其中的类型不匹配问题。

单元测试发现问题:通过AI编写了17个全面的单元测试用例,覆盖:

  • 正常场景:各种有效参数组合
  • 边界场景:null值、空集合处理
  • 参数组合:enabled为true/false/null的不同情况
@Test
public void testPageQueryWhenEnabledIsTrue() {
    // arrange
    List<Long> shopIds = Arrays.asList(12345L, 67890L);
    String chatSceneCode = "SCENE_C";
    Boolean enabled = true;  // 测试enabled为true的情况

    // 模拟数据库返回的实体,enabled字段为Boolean类型
    AgentRobotEntity mockEntity = new AgentRobotEntity();
    mockEntity.setEnabled(true);  // 注意:这里是Boolean类型
    mockEntity.setChatSceneCode("SCENE_C");

    when(agentRobotDAO.getRobotById(anyLong(), eq(false))).thenReturn(mockEntity);

    // act
    List<AgentRobotE> result = repository.pageQueryRobotsByCondition(
        shopIds, chatSceneCode, enabled, 1, 10);

    // assert - 这个测试失败了!
    assertEquals(1, result.size());  // 期望返回1个结果,实际返回0个
}

测试运行结果:当enabled为true时测试失败!

问题定位:通过测试失败,快速定位到过滤逻辑的问题:

// 错误的逻辑:entity.getEnabled()返回Boolean类型,但与Integer比较
Objects.equals(entity.getEnabled(), enabled ? 1 : 0)
// 当enabled=true时,比较的是 Objects.equals(Boolean.TRUE, 1) -> false
// 当enabled=false时,比较的是 Objects.equals(Boolean.TRUE, 0) -> false

正确修复:

// 修复后:直接比较Boolean类型
.filter(entity -> enabled == null || Objects.equals(entity.getEnabled(), enabled))

意外收获:在审查测试覆盖的代码时,还发现了N+1查询的性能问题:

// 存在性能问题的代码
.map(robotId -> agentRobotDAO.getRobotById(robotId, false))  // 每个robotId单独查询

成果验证:修复后,所有17个单元测试用例全部通过,代码质量得到保障。

三、策略二:构建安全网保护存量代码

3.1 问题场景

AI对存量代码的修改挑战更大。AI看到的可能只是函数或类的局部,无法理解背后的业务规则和历史包袱。如何放心的让AI修改已有的代码?

在进行AI Coding前,需要确保旧有逻辑,处于单元测试的完全覆盖保护中,这就像在开启汽车的“自动辅助驾驶”功能前,必须先系好安全带一样。这条“安全带”就是我们完善的、可运行的单元测试集。

  • 快速验证,精准反馈:AI生成修改后的代码无需人工逐行对比,只需运行单元测试即可获得即时反馈。测试失败的用例直接揭示AI修改中存在的问题——要么触及了不应改动的逻辑,要么未能正确实现预期变更。这种反馈机制既高效又客观。
  • 清晰界定修改边界:单元测试结果帮助我们明确判断——AI的修改是否精准实现了目标?在引入新功能的同时是否完整保留了原有逻辑?通过区分预期内的失败(主动修改旧逻辑)和意外失败(破坏现有功能),我们获得了优化AI方案的明确方向,大幅提升了迭代效率。

3.2 案例:延迟回复策略的用户范围扩展

业务背景:需要将消息延迟回复服务从原来的平台A、平台B的用户扩展到平台C用户。

原始代码分析:

// TextDelayReplyStrategy.java 中的核心逻辑
private boolean needSkip(ChatHistoryE chatHistoryE) {
    UserDTO UserDTO = UserHelper.parseUser(chatHistoryE.getUserId());
    return MessageSendDirectionEnum.CLIENT_SEND.value != chatHistoryE.getMessageStatus()
               || MessageShieldEnum.RECEIVER_SHIELD.value == chatHistoryE.getShield()
               || UserDTO == null
               || !UserType.isLoginUser(UserDTO.getUserType());  // 关键判断逻辑
}

这个needSkip方法决定了哪些用户类型需要跳过延迟回复处理。原逻辑中,UserType.isLoginUser()只覆盖平台A、平台B的登录用户,不包括平台C用户。

修改前的安全网构建:

按照“分析-测试-实施-验证”方法论,首先完善单元测试:

// 针对现有逻辑的保护性测试
@Test
public void testNeedSkipWithAUser() {
    // 平台A用户不应被跳过
    ChatHistoryE chatHistory = buildChatHistory(A_USER_ID);
    assertFalse(strategy.needSkip(chatHistory));
}

@Test
public void testNeedSkipWithBUser() {
    // 平台B用户不应被跳过
    ChatHistoryE chatHistory = buildChatHistory(B_USER_ID);
    assertFalse(strategy.needSkip(chatHistory));
}

@Test
public void testNeedSkipWithCUser() {
    // 平台C在修改前应被跳过
    ChatHistoryE chatHistory = buildChatHistory(C_USER_ID);
    assertTrue(strategy.needSkip(chatHistory));  // 修改前的预期行为
}

@Test
public void testNeedSkipWithGuestUser() {
    // 游客用户应被跳过
    ChatHistoryE chatHistory = buildChatHistory(GUEST_USER_ID);
    assertTrue(strategy.needSkip(chatHistory));
}

运行基线测试:确保所有测试通过,建立基线状态

[INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0
[INFO] 所有现有逻辑测试通过,可以安全修改

AI辅助修改实施:

向AI提供需求:”将平台C用户也纳入延迟回复服务范围”

AI分析代码后给出修改方案:

// 修改后的代码
private boolean needSkip(ChatHistoryE chatHistoryE) {
    UserDTO UserDTO = UserHelper.parseUser(chatHistoryE.getUserId());
    return MessageSendDirectionEnum.CLIENT_SEND.value != chatHistoryE.getMessageStatus()
               || MessageShieldEnum.RECEIVER_SHIELD.value == chatHistoryE.getShield()
               || UserDTO == null
               || !UserType.isAorBorCLoginUser(UserDTO.getUserType());  // 扩展用户范围
}

验证阶段的精准反馈:

修改后运行测试集:

# 运行结果
[INFO] Tests run: 15, Failures: 1, Errors: 0, Skipped: 0
[ERROR] testNeedSkipWithCProviderUser: expected:<true> but was:<false>

结果分析:

✅ testNeedSkipWithAUser - 通过(平台A用户逻辑未变)
✅ testNeedSkipWithBUser - 通过(平台B用户逻辑未变)
❌ testNeedSkipWithCUser - 失败(平台C预期的变更)
✅ testNeedSkipWithGuestUser - 通过(游客用户逻辑未变)

更新期望值:

@Test
public void testNeedSkipWithCUser() {
    // 修改后:平台C不应被跳过
    ChatHistoryE chatHistory = buildChatHistory(C_USER_ID);
    assertFalse(strategy.needSkip(chatHistory));  // 更新期望值
}

最终验证:

[INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0
[INFO] 所有测试通过,修改安全完成

这种方法将开发者从“担心AI改坏代码”的不信任中解放出来,明确知道哪些功能被影响,哪些保持不变,实现安全、高效的存量代码演进。

四、策略三:TDD思想驱动AI开发

4.1 “先生成,后验证”的局限

前面两节所提到的策略可以归类为”先生成,后验证”,在一定的场景下仍然存在两个问题:

  • 提示词驱动:开发者反复修改自然语言描述,AI产出不确定,返工频繁;
  • 肉眼审查:生成测试用例仍然需要人工验证,一旦用例较多,效率依然低下。

4.2 TDD模式的革命性转变

TDD 核心理念:

  • 测试先行:先写测试,再写实现代码。
  • 小步快跑:以微小增量推进开发,每次只解决一个问题。
  • 设计驱动:测试即需求文档,驱动接口设计和代码结构。
  • 安全网:测试集提供即时反馈,支持安全重构。

整个开发过程严格遵循 Red -> Green -> Refactor 的循环。

  • 🔴 Red: 先编写一个失败的单元测试,用代码来定义我们期望实现的功能。
  • 🟢 Green: 编写最精简的业务代码,让测试恰好通过。
  • 🔵 Refactor: 在测试持续通过的前提下,重构优化代码的设计和质量。

借助测试驱动开发(TDD)思想,我们先为AI提供一份清晰、无歧义的“需求说明书”和“验收标准”,然后指导它进行代码的生成。这个过程的核心是“🔴 红-🟢 绿-🔵 重构”循环,它将我们的每一次的对话,都转化为一次可验证的、可累加的进步。采用“先验证,后实现”的红-绿-重构循环,将模糊的需求转化为精确的代码语言。

4.3 案例:优惠券使用规则引擎的复杂逻辑

业务需求:开发一个智能优惠券使用规则引擎,支持”多券叠加使用和最优组合推荐”

传统困难

  • 自然语言描述:“实现优惠券规则引擎,支持多种券类型的叠加使用,并智能推荐最优使用方案”
  • AI需要猜测:哪些券可以叠加?什么是“最优”?有哪些使用限制?
  • 反复沟通:多次修改提示词,AI理解仍然偏离业务实际

第一次尝试:AI理解为“简单累加所有优惠”

// AI第一次实现 - 过于简化
public BigDecimal calculateDiscount(Order order, List<Coupon> coupons) {
    return coupons.stream()
        .map(coupon -> coupon.getDiscountAmount())
        .reduce(BigDecimal.ZERO, BigDecimal::add);
}
// 问题:忽略了券的使用条件、互斥规则、叠加限制

第二次尝试:AI理解为“选择面额最大的券”

// AI第二次实现 - 逻辑错误
public List<Coupon> selectOptimalCoupons(Order order, List<Coupon> availableCoupons) {
    return availableCoupons.stream()
        .filter(coupon -> order.getTotalAmount().compareTo(coupon.getMinOrderAmount()) >= 0)
        .max(Comparator.comparing(Coupon::getDiscountAmount))
        .map(List::of)
        .orElse(Collections.emptyList());
}
// 问题:只考虑单券最大优惠,未考虑多券组合的更优效果

第三次尝试:AI尝试复杂逻辑但引入更多问题

// AI第三次实现 - 逻辑混乱
public CouponUsageResult applyCoupons(Order order, List<Coupon> coupons) {
    // 各种复杂的if-else嵌套,但缺乏清晰的业务规则
    // 没有处理券的互斥关系
    // 没有考虑计算顺序对最终优惠的影响
    // 边界条件处理不当
}

经过多轮提示词优化,每次都需要重新解释复杂的业务规则,仍不满足预期。

TDD方式的完整循环:

🔴 红色阶段:用测试定义需求

编写测试用例,精确定义复杂的业务规则:

@Test
public void testCouponUsageWithBasicStackingRules() {
    // 构造订单:总价100元,包含数码产品
    Order order = new Order()
        .setTotalAmount(new BigDecimal("100.00"))
        .addItem("数码产品", new BigDecimal("100.00"));
    
    // 构造可用优惠券
    List<Coupon> availableCoupons = Arrays.asList(
        new Coupon().setType("满减券").setCondition("满50减10").setDiscountAmount(new BigDecimal("10")),
        new Coupon().setType("打折券").setCondition("数码类9折").setDiscountRate(new BigDecimal("0.9")),
        new Coupon().setType("免邮券").setCondition("免运费").setDiscountAmount(new BigDecimal("5"))
    );
    
    // 期望结果:满减券和免邮券可叠加,打折券与满减券互斥,应选择最优组合
    CouponUsageResult result = CouponEngine.calculateOptimalUsage(order, availableCoupons);
    
    // 验证最优方案:使用打折券+免邮券 (90+0=90元,比满减券+免邮券的85元更优)
    assertEquals(2, result.getUsedCoupons().size());
    assertTrue(result.getUsedCoupons().stream().anyMatch(c -> "打折券".equals(c.getType())));
    assertTrue(result.getUsedCoupons().stream().anyMatch(c -> "免邮券".equals(c.getType())));
    assertEquals(new BigDecimal("95.00"), result.getFinalAmount()); // 100*0.9 + 0 - 5运费
}

@Test  
public void testCouponMutualExclusionRules() {
    Order order = new Order().setTotalAmount(new BigDecimal("200.00"));
    
    List<Coupon> availableCoupons = Arrays.asList(
        new Coupon().setType("满减券").setCondition("满100减30").setDiscountAmount(new BigDecimal("30")),
        new Coupon().setType("打折券").setCondition("全场8折").setDiscountRate(new BigDecimal("0.8")),
        new Coupon().setType("新用户专享").setCondition("首单5折").setDiscountRate(new BigDecimal("0.5"))
    );
    
    CouponUsageResult result = CouponEngine.calculateOptimalUsage(order, availableCoupons);
    
    // 验证互斥规则:新用户券与其他券互斥,且优惠最大,应该单独使用
    assertEquals(1, result.getUsedCoupons().size());
    assertEquals("新用户专享", result.getUsedCoupons().get(0).getType());
    assertEquals(new BigDecimal("100.00"), result.getFinalAmount()); // 200 * 0.5
}

@Test
public void testCouponUsageConditionValidation() {
    Order order = new Order()
        .setTotalAmount(new BigDecimal("30.00"))
        .setUserLevel("普通用户")
        .addItem("服装", new BigDecimal("30.00"));
    
    List<Coupon> availableCoupons = Arrays.asList(
        new Coupon().setCondition("满50减10"), // 不满足金额条件
        new Coupon().setCondition("VIP专享9折"), // 不满足用户等级条件  
        new Coupon().setCondition("数码类8折"), // 不满足品类条件
        new Coupon().setCondition("无门槛5元券").setDiscountAmount(new BigDecimal("5")) // 满足条件
    );
    
    CouponUsageResult result = CouponEngine.calculateOptimalUsage(order, availableCoupons);
    
    // 验证条件判断:只有无门槛券可用
    assertEquals(1, result.getUsedCoupons().size());
    assertEquals("无门槛5元券", result.getUsedCoupons().get(0).getCondition());
    assertEquals(new BigDecimal("25.00"), result.getFinalAmount());
}

运行测试:

[ERROR] testCouponUsageWithBasicStackingRules: 
Class CouponEngine doesn't exist
[ERROR] testCouponMutualExclusionRules:
Method calculateOptimalUsage not found

红色阶段成功:测试失败,但复杂的业务规则已经通过代码精确表达。

🟢 绿色阶段:AI精确实现

将失败的测试用例提供给AI:“请实现CouponEngine.calculateOptimalUsage方法,让这些测试通过”

AI分析测试用例后,精确理解了复杂的业务规则:

  1. 券类型包括满减券、打折券、免邮券、新用户券等
  2. 不同券有不同的叠加和互斥规则
  3. 需要验证使用条件(金额、用户等级、商品类别)
  4. 要计算最优的券使用组合

AI生成的实现方案:

public class CouponEngine {
    public static CouponUsageResult calculateOptimalUsage(Order order, List<Coupon> availableCoupons) {
        // 1. 过滤出满足使用条件的券
        List<Coupon> eligibleCoupons = availableCoupons.stream()
            .filter(coupon -> isEligible(order, coupon))
            .collect(Collectors.toList());
        
        // 2. 生成所有可能的券组合(考虑互斥规则)
        List<List<Coupon>> validCombinations = generateValidCombinations(eligibleCoupons);
        
        // 3. 计算每种组合的最终优惠,选择最优方案
        return validCombinations.stream()
            .map(combination -> calculateResult(order, combination))
            .min(Comparator.comparing(CouponUsageResult::getFinalAmount))
            .orElse(new CouponUsageResult(order.getTotalAmount(), Collections.emptyList()));
    }
    
    private static boolean isEligible(Order order, Coupon coupon) {
        String condition = coupon.getCondition();
        
        // 金额条件检查
        if (condition.contains("满") && condition.contains("减")) {
            BigDecimal minAmount = extractMinAmount(condition);
            if (order.getTotalAmount().compareTo(minAmount) < 0) return false;
        }
        
        // 用户等级检查
        if (condition.contains("VIP") && !"VIP用户".equals(order.getUserLevel())) {
            return false;
        }
        
        // 商品类别检查
        if (condition.contains("数码类") && !order.hasCategory("数码产品")) {
            return false;
        }
        
        return true;
    }
    
    private static List<List<Coupon>> generateValidCombinations(List<Coupon> coupons) {
        List<List<Coupon>> combinations = new ArrayList<>();
        
        // 生成所有子集组合
        for (int i = 0; i < Math.pow(2, coupons.size()); i++) {
            List<Coupon> combination = new ArrayList<>();
            for (int j = 0; j < coupons.size(); j++) {
                if ((i & (1 << j)) != 0) {
                    combination.add(coupons.get(j));
                }
            }
            
            // 检查组合是否符合互斥规则
            if (isValidCombination(combination)) {
                combinations.add(combination);
            }
        }
        
        return combinations;
    }
    
    private static boolean isValidCombination(List<Coupon> combination) {
        // 检查互斥规则
        boolean hasDiscountCoupon = combination.stream().anyMatch(c -> "打折券".equals(c.getType()));
        boolean hasReduceCoupon = combination.stream().anyMatch(c -> "满减券".equals(c.getType()));
        boolean hasNewUserCoupon = combination.stream().anyMatch(c -> "新用户专享".equals(c.getType()));
        
        // 新用户券与其他券互斥
        if (hasNewUserCoupon && combination.size() > 1) {
            return false;
        }
        
        // 打折券与满减券互斥
        if (hasDiscountCoupon && hasReduceCoupon) {
            return false;
        }
        
        return true;
    }
    
    private static CouponUsageResult calculateResult(Order order, List<Coupon> usedCoupons) {
        BigDecimal finalAmount = order.getTotalAmount();
        
        // 按照优先级顺序应用优惠
        for (Coupon coupon : usedCoupons) {
            if ("打折券".equals(coupon.getType())) {
                finalAmount = finalAmount.multiply(coupon.getDiscountRate());
            } else if ("满减券".equals(coupon.getType()) || "无门槛5元券".equals(coupon.getCondition())) {
                finalAmount = finalAmount.subtract(coupon.getDiscountAmount());
            } else if ("免邮券".equals(coupon.getType())) {
                // 免邮券单独处理运费
                finalAmount = finalAmount.subtract(coupon.getDiscountAmount());
            }
        }
        
        return new CouponUsageResult(finalAmount, usedCoupons);
    }
}

运行测试:

[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO] All tests passed!

绿色阶段成功:测试通过,复杂的业务逻辑完全正确。

🔵 重构阶段:优化代码质量

在测试保护下,对AI生成的代码进行质量优化:

public class CouponEngine {
    // 提取常量,增强可读性
    private static final Set<String> MUTUALLY_EXCLUSIVE_TYPES = Set.of("打折券", "满减券");
    private static final String NEW_USER_COUPON_TYPE = "新用户专享";
    
    public static CouponUsageResult calculateOptimalUsage(Order order, List<Coupon> availableCoupons) {
        if (CollectionUtils.isEmpty(availableCoupons)) {
            return new CouponUsageResult(order.getTotalAmount(), Collections.emptyList());
        }
        
        // 使用策略模式优化条件验证
        List<Coupon> eligibleCoupons = availableCoupons.stream()
            .filter(coupon -> CouponValidator.isEligible(order, coupon))
            .collect(Collectors.toList());
        
        // 使用组合算法优化券组合生成
        List<List<Coupon>> validCombinations = CouponCombinator.generateValidCombinations(eligibleCoupons);
        
        // 使用计算引擎优化折扣计算
        return validCombinations.stream()
            .map(combination -> DiscountCalculator.calculateResult(order, combination))
            .min(Comparator.comparing(CouponUsageResult::getFinalAmount))
            .orElse(new CouponUsageResult(order.getTotalAmount(), Collections.emptyList()));
    }
}

// 职责分离:券验证器
class CouponValidator {
    public static boolean isEligible(Order order, Coupon coupon) {
        return AmountValidator.validate(order, coupon) &&
               UserLevelValidator.validate(order, coupon) &&
               CategoryValidator.validate(order, coupon);
    }
}

// 职责分离:券组合器
class CouponCombinator {
    public static List<List<Coupon>> generateValidCombinations(List<Coupon> coupons) {
        return PowerSetGenerator.generate(coupons).stream()
            .filter(MutualExclusionChecker::isValidCombination)
            .collect(Collectors.toList());
    }
}

// 职责分离:折扣计算器
class DiscountCalculator {
    public static CouponUsageResult calculateResult(Order order, List<Coupon> usedCoupons) {
        // 按优先级排序券,确保计算顺序正确
        List<Coupon> sortedCoupons = usedCoupons.stream()
            .sorted(Comparator.comparing(CouponPriorityResolver::getPriority))
            .collect(Collectors.toList());
        
        BigDecimal finalAmount = order.getTotalAmount();
        
        for (Coupon coupon : sortedCoupons) {
            finalAmount = applyCouponDiscount(finalAmount, coupon);
        }
        
        return new CouponUsageResult(finalAmount, usedCoupons);
    }
    
    private static BigDecimal applyCouponDiscount(BigDecimal currentAmount, Coupon coupon) {
        return CouponTypeHandler.getHandler(coupon.getType())
            .applyDiscount(currentAmount, coupon);
    }
}

重构验证:

[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO] 重构完成,测试持续通过,代码结构更清晰,职责分离更明确

协作模式转变:开发者不再需要为如何描述复杂的业务规则而烦恼,现在只需专注于设计精确的测试场景——我们负责定义“做什么”和“预期结果”,而AI则负责实现具体的“怎么做”。这种明确的分工让复杂逻辑的开发变得既可控又高效。

通过这种方式,我们能够确保:

  1. 需求表达精准无歧义
  2. 边界条件全面覆盖
  3. 实现过程完全可控
  4. 重构过程安全可靠

当需要开发新场景时,只需新增测试用例即可,完全不必担心会破坏原有逻辑。这种开发模式不仅提升了效率,更确保了系统的稳定性和可维护性。

五、实践要点

5.1 环境配置

确保AI Agent能执行mvn test命令

设定明确的行为准则(Rule),让AI能够知道我们现在遵循的开发范式,防止AI为了通过测试”作弊”修改业务代码。一个借助TDD思想驱动代码生成的执行准则如下

# AI Agent 行为准则:TDD 测试驱动开发

## 1. 总则

### 1.1. 概述
为了确保 AI Agent 遵循 TDD(测试驱动开发)的开发模式,Agent 必须严格按照 **Red-Green-Refactor** 三个阶段的循环进行开发。在执行每个阶段前,Agent 必须向开发者明确声明其当前所处的阶段。

本准则旨在确保 Agent 遵循正确的 TDD 开发流程,避免跳过关键步骤。

### 1.2. 环境配置:强制使用指定的 settings.xml
**核心要求**: 所有对 `mvn@ 命令的调用(如 mvn test@, mvn compile@ 等),都**必须**使用 --settings@ (或 -s@) 参数来指定一个自定义的 settings.xml` 文件,以确保能够访问内部的 Maven 仓库。

- **命令格式示例**: `mvn --settings [settings.xml的绝对路径] test`
- **`settings.xml` 文件路径**: `[settings.xml的绝对路径]`

Agent 在执行任何 Maven 命令前,必须确认此路径已被正确配置和使用。

---

## 2. TDD 三阶段循环

### 2.1. 第一阶段:RED (写失败的测试)

#### 2.1.1. 目标
编写一个**必然失败**的测试用例,明确定义即将实现的功能需求。

#### 2.1.2. 核心准则
- **允许**: Agent 可以在 `src/test/` 目录下创建新的测试文件或添加新的测试方法
- **要求**:
  - 测试必须是失败的(因为对应的实现代码尚未存在或不完整)
  - 一次只测试一个功能点
  - 测试代码要简单清晰
  - 测试名称要明确表达测试意图
- **禁止**: Agent **不能**修改 `src/main/` 目录下的任何现有代码
- **验证**: 运行测试必须显示红色(失败状态)

#### 2.1.3. 交互示例
- **开发者提示**: "我需要实现一个计算器的加法功能"
- **Agent 回应**: "已激活 **RED 阶段**。我将先编写一个失败的测试用例来定义加法功能的需求。"

### 2.2. 第二阶段:GREEN (让测试通过的最简实现)

#### 2.2.1. 目标
编写**最简单**的实现代码,让当前失败的测试通过。

#### 2.2.2. 核心准则
- **允许**: Agent 可以创建、修改 `src/main/` 目录下的代码
- **要求**:
  - 优先考虑最简单的实现方式
  - 专注于满足当前测试用例
  - 快速实现功能让测试通过
- **禁止**:
  - **不能**修改测试代码
  - **不考虑**代码质量和性能优化
  - **不进行**过度设计
- **验证**: 运行测试必须显示绿色(通过状态)

#### 2.2.3. 交互示例
- **Agent 回应**: "已激活 **GREEN 阶段**。我将实现最简单的代码来让刚才的测试通过,不考虑优化和设计。"

### 2.3. 第三阶段:REFACTOR (重构优化)

#### 2.3.1. 目标
在保持测试通过的前提下,改进代码的设计、质量和可维护性。

#### 2.3.2. 核心准则
- **允许**: Agent 可以重构 `src/main/` 目录下的实现代码
- **要求**:
  - 改进代码设计和质量
  - 消除重复代码
  - 提高代码可读性和可维护性
  - 每次重构后必须运行测试确保通过
- **禁止**:
  - **不能**修改测试的行为和期望
  - **不能**破坏现有功能
- **验证**: 重构过程中和完成后,所有测试必须保持绿色

#### 2.3.3. 交互示例
- **Agent 回应**: "已激活 **REFACTOR 阶段**。我将重构代码以提高质量,同时确保所有测试保持通过状态。"

---

## 3. TDD 最佳实践

### 3.1. 循环节奏
- **小步快走**: 每个 Red-Green-Refactor 循环应该很短(几分钟到十几分钟)
- **频繁验证**: 每个阶段完成后都要运行测试验证
- **逐步推进**: 一次只关注一个小功能点

### 3.2. 测试质量要求
- **快速执行**: 单元测试应该在秒级内完成
- **独立性**: 测试之间不应该有依赖关系
- **可重复性**: 测试结果应该是确定的和可重复的
- **清晰命名**: 测试方法名应明确表达测试意图

### 3.3. 代码质量保证
- **持续重构**: 在每个循环的 REFACTOR 阶段改进代码
- **消除重复**: 遵循 DRY(Don't Repeat Yourself)原则
- **保持简洁**: 代码应该简洁明了,易于理解

### 3.4. 流程控制
Agent 在每个阶段转换时,必须:
1. 明确声明即将进入的阶段
2. 说明当前阶段的具体目标
3. 完成阶段后验证结果
4. 确认是否继续下一个循环

5.2 掌握单测语法

AI擅长基础用例覆盖,但复杂业务场景、边界条件仍有可能需要开发者手动编写。不要完全依赖AI构造用例。

5.3 选择合适场景与策略

快速决策法则:

  • 简单功能:单个方法,逻辑直观,采用“先实现,后验证”;
  • 复杂业务逻辑:多分支判断、算法计算、状态转换,采用TDD“先验证,后实现”;
  • 存量代码修改:采用“安全网保护”策略;
  • 提示词难以描述需求时:测试用例是最好的需求文档,采用TDD让代码直接表达需求。

5.4 持续维护

单元测试必须与业务代码演进保持同步。一个过时的、无人维护的测试集,其价值会迅速归零,甚至成为负资产。

六、结语

如今,单元测试已被赋予全新的意义——它不再被视为一种“开发负担”,而是进化成为AI Coding时代的“质量引擎”。

我们构建起三重关键保障:

  • 策略一:以客观检验替代主观判断,让AI代码告别“看起来没问题”的错觉;
  • 策略二:为存量代码筑起防护墙,使修改存量代码安全可控,降低演进风险;
  • 策略三:用测试作为与AI的沟通语言,精准传递复杂需求与预期。

更深层次的变化在于,我们正在重新定义开发者的核心价值:当我们从“思考提示词”转向“思考测试用例”,本质上是从AI代码被动的审查者,转变为了主动的需求设计者与质量掌控者。这不仅加速开发进程,更显著提升代码质量。这正是AI时代中,开发者与智能工具协同进化的优秀范式。

AI生成代码质量难以把控!本文分享来自美团的技术实践,三大策略破解AI编程痛点。单测快速验证逻辑正确性,安全网保护存量代码演进,TDD模式精准传递需求。告别「看起来没问题」的错觉,构建AI时代的代码质量保障体系。

一、引言

目前,国内外很多AI Coding助手能在几秒钟内生成完整代码块,大大提升了开发效率,但这种高速开发模式也带来了潜在风险——与人工编码不同是,AI Coding助手生成代码存在两个特殊风险:其一,AI Coding助手依赖于上下文与模型自身的能力,输出的代码质量相对不可控。其二,AI生成的代码虽然逻辑通顺、结构完整,但可能隐藏着难以察觉的边界问题或逻辑缺陷。

核心问题:我们如何快速的验证AI生成代码的质量和可靠性?

本文旨在分享如何借助单元测试,让AI编程合作更高效可靠,主要解决三个常见痛点:

  1. 肉眼审查困境:AI一次性生成大量代码时,难以快速准确判断逻辑完备性;
  2. 存量代码信任危机:如何验证AI修改老代码时,不会产生非预期的结果;
  3. 需求传达难题:如何精准向AI表达复杂需求并快速验证。

针对上述三个常见痛点,本文提出采用不同的单元测试策略来应对以上问题。每个策略都针对一个特定痛点设计:策略一通过测试解决肉眼审查的局限性;策略二构建单测安全网应对存量代码的信任问题;策略三则采用TDD模式优化需求传达与验证流程。下文将依次展开说明,希望能对大家有所帮助或启发。

二、策略一:单测检验AI代码逻辑正确性

2.1 问题背景

传统的人工代码审查在AI生成的大量代码面前显得低效且不可靠。在软件测试实践中,有着测试左移(Shift Left Testing)的概念,本质上是借助工具和测试手段更早地发现问题和预防问题。在AI Coding时代,这一理念尤为关键:跳过单元测试直接集成测试看似”抄近路”,实则是将风险后置——开发阶段几分钟能发现的Bug,在集成测试环境可能需要较长定位修复,这中间包含了代码部署、环境准备、测试条件的准备、问题定位、开发人员修复、再次部署验证等一系列漫长的环节。

相比之下,单元测试具有独特的优势:它能够独立运行、快速验证结果,并且可以无限次重复执行。这种测试方式就像是为项目进行的一次性投资,却能为整个开发周期构建起一张可靠的“安全网”。它不仅能实时验证AI Coding生成的代码是否正确,更能持续保障未来代码的质量稳定性,让开发团队始终对代码库保持信心。

2.2 案例:分页查询接口的隐蔽Bug

任务背景:实现一个支持多条件筛选的复杂分页查询接口pageQueryRobot

AI生成了如下核心查询逻辑:

public List<AgentRobotE> pageQueryRobotsByCondition(List<Long> shopIds, String chatSceneCode,
        Boolean enabled, Integer pageNo, Integer pageSize) {
    // ... 前置校验代码 ...

    // 分页查询机器人基础信息
    int offset = (pageNo - 1) * pageSize;
    List<AgentRobotEntity> entities = robotIds.stream()
            .skip(offset)
            .limit(pageSize)
            .map(robotId -> agentRobotDAO.getRobotById(robotId, false))
            .filter(Objects::nonNull)
            // 问题代码:类型不匹配的隐蔽Bug
            .filter(entity -> enabled == null || Objects.equals(entity.getEnabled(), enabled ? 1 : 0))
            .filter(entity -> Objects.equals(entity.getChatSceneCode(), chatSceneCode))
            .collect(Collectors.toList());

    return entities.stream()
            .map(this::convertToModel)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
}

问题分析:这段代码看起来逻辑完整,但第8行的过滤逻辑包含了多个复杂元素:

  • 三元运算符 enabled ? 1 : 0
  • Objects.equals 的使用
  • Boolean到Integer的隐式逻辑转换

仅凭肉眼很难发现其中的类型不匹配问题。

单元测试发现问题:通过AI编写了17个全面的单元测试用例,覆盖:

  • 正常场景:各种有效参数组合
  • 边界场景:null值、空集合处理
  • 参数组合:enabled为true/false/null的不同情况
@Test
public void testPageQueryWhenEnabledIsTrue() {
    // arrange
    List<Long> shopIds = Arrays.asList(12345L, 67890L);
    String chatSceneCode = "SCENE_C";
    Boolean enabled = true;  // 测试enabled为true的情况

    // 模拟数据库返回的实体,enabled字段为Boolean类型
    AgentRobotEntity mockEntity = new AgentRobotEntity();
    mockEntity.setEnabled(true);  // 注意:这里是Boolean类型
    mockEntity.setChatSceneCode("SCENE_C");

    when(agentRobotDAO.getRobotById(anyLong(), eq(false))).thenReturn(mockEntity);

    // act
    List<AgentRobotE> result = repository.pageQueryRobotsByCondition(
        shopIds, chatSceneCode, enabled, 1, 10);

    // assert - 这个测试失败了!
    assertEquals(1, result.size());  // 期望返回1个结果,实际返回0个
}

测试运行结果:当enabled为true时测试失败!

问题定位:通过测试失败,快速定位到过滤逻辑的问题:

// 错误的逻辑:entity.getEnabled()返回Boolean类型,但与Integer比较
Objects.equals(entity.getEnabled(), enabled ? 1 : 0)
// 当enabled=true时,比较的是 Objects.equals(Boolean.TRUE, 1) -> false
// 当enabled=false时,比较的是 Objects.equals(Boolean.TRUE, 0) -> false

正确修复:

// 修复后:直接比较Boolean类型
.filter(entity -> enabled == null || Objects.equals(entity.getEnabled(), enabled))

意外收获:在审查测试覆盖的代码时,还发现了N+1查询的性能问题:

// 存在性能问题的代码
.map(robotId -> agentRobotDAO.getRobotById(robotId, false))  // 每个robotId单独查询

成果验证:修复后,所有17个单元测试用例全部通过,代码质量得到保障。

三、策略二:构建安全网保护存量代码

3.1 问题场景

AI对存量代码的修改挑战更大。AI看到的可能只是函数或类的局部,无法理解背后的业务规则和历史包袱。如何放心的让AI修改已有的代码?

在进行AI Coding前,需要确保旧有逻辑,处于单元测试的完全覆盖保护中,这就像在开启汽车的“自动辅助驾驶”功能前,必须先系好安全带一样。这条“安全带”就是我们完善的、可运行的单元测试集。

  • 快速验证,精准反馈:AI生成修改后的代码无需人工逐行对比,只需运行单元测试即可获得即时反馈。测试失败的用例直接揭示AI修改中存在的问题——要么触及了不应改动的逻辑,要么未能正确实现预期变更。这种反馈机制既高效又客观。
  • 清晰界定修改边界:单元测试结果帮助我们明确判断——AI的修改是否精准实现了目标?在引入新功能的同时是否完整保留了原有逻辑?通过区分预期内的失败(主动修改旧逻辑)和意外失败(破坏现有功能),我们获得了优化AI方案的明确方向,大幅提升了迭代效率。

3.2 案例:延迟回复策略的用户范围扩展

业务背景:需要将消息延迟回复服务从原来的平台A、平台B的用户扩展到平台C用户。

原始代码分析:

// TextDelayReplyStrategy.java 中的核心逻辑
private boolean needSkip(ChatHistoryE chatHistoryE) {
    UserDTO UserDTO = UserHelper.parseUser(chatHistoryE.getUserId());
    return MessageSendDirectionEnum.CLIENT_SEND.value != chatHistoryE.getMessageStatus()
               || MessageShieldEnum.RECEIVER_SHIELD.value == chatHistoryE.getShield()
               || UserDTO == null
               || !UserType.isLoginUser(UserDTO.getUserType());  // 关键判断逻辑
}

这个needSkip方法决定了哪些用户类型需要跳过延迟回复处理。原逻辑中,UserType.isLoginUser()只覆盖平台A、平台B的登录用户,不包括平台C用户。

修改前的安全网构建:

按照“分析-测试-实施-验证”方法论,首先完善单元测试:

// 针对现有逻辑的保护性测试
@Test
public void testNeedSkipWithAUser() {
    // 平台A用户不应被跳过
    ChatHistoryE chatHistory = buildChatHistory(A_USER_ID);
    assertFalse(strategy.needSkip(chatHistory));
}

@Test
public void testNeedSkipWithBUser() {
    // 平台B用户不应被跳过
    ChatHistoryE chatHistory = buildChatHistory(B_USER_ID);
    assertFalse(strategy.needSkip(chatHistory));
}

@Test
public void testNeedSkipWithCUser() {
    // 平台C在修改前应被跳过
    ChatHistoryE chatHistory = buildChatHistory(C_USER_ID);
    assertTrue(strategy.needSkip(chatHistory));  // 修改前的预期行为
}

@Test
public void testNeedSkipWithGuestUser() {
    // 游客用户应被跳过
    ChatHistoryE chatHistory = buildChatHistory(GUEST_USER_ID);
    assertTrue(strategy.needSkip(chatHistory));
}

运行基线测试:确保所有测试通过,建立基线状态

[INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0
[INFO] 所有现有逻辑测试通过,可以安全修改

AI辅助修改实施:

向AI提供需求:”将平台C用户也纳入延迟回复服务范围”

AI分析代码后给出修改方案:

// 修改后的代码
private boolean needSkip(ChatHistoryE chatHistoryE) {
    UserDTO UserDTO = UserHelper.parseUser(chatHistoryE.getUserId());
    return MessageSendDirectionEnum.CLIENT_SEND.value != chatHistoryE.getMessageStatus()
               || MessageShieldEnum.RECEIVER_SHIELD.value == chatHistoryE.getShield()
               || UserDTO == null
               || !UserType.isAorBorCLoginUser(UserDTO.getUserType());  // 扩展用户范围
}

验证阶段的精准反馈:

修改后运行测试集:

# 运行结果
[INFO] Tests run: 15, Failures: 1, Errors: 0, Skipped: 0
[ERROR] testNeedSkipWithCProviderUser: expected:<true> but was:<false>

结果分析:

✅ testNeedSkipWithAUser - 通过(平台A用户逻辑未变)
✅ testNeedSkipWithBUser - 通过(平台B用户逻辑未变)
❌ testNeedSkipWithCUser - 失败(平台C预期的变更)
✅ testNeedSkipWithGuestUser - 通过(游客用户逻辑未变)

更新期望值:

@Test
public void testNeedSkipWithCUser() {
    // 修改后:平台C不应被跳过
    ChatHistoryE chatHistory = buildChatHistory(C_USER_ID);
    assertFalse(strategy.needSkip(chatHistory));  // 更新期望值
}

最终验证:

[INFO] Tests run: 15, Failures: 0, Errors: 0, Skipped: 0
[INFO] 所有测试通过,修改安全完成

这种方法将开发者从“担心AI改坏代码”的不信任中解放出来,明确知道哪些功能被影响,哪些保持不变,实现安全、高效的存量代码演进。

四、策略三:TDD思想驱动AI开发

4.1 “先生成,后验证”的局限

前面两节所提到的策略可以归类为”先生成,后验证”,在一定的场景下仍然存在两个问题:

  • 提示词驱动:开发者反复修改自然语言描述,AI产出不确定,返工频繁;
  • 肉眼审查:生成测试用例仍然需要人工验证,一旦用例较多,效率依然低下。

4.2 TDD模式的革命性转变

TDD 核心理念:

  • 测试先行:先写测试,再写实现代码。
  • 小步快跑:以微小增量推进开发,每次只解决一个问题。
  • 设计驱动:测试即需求文档,驱动接口设计和代码结构。
  • 安全网:测试集提供即时反馈,支持安全重构。

整个开发过程严格遵循 Red -> Green -> Refactor 的循环。

  • 🔴 Red: 先编写一个失败的单元测试,用代码来定义我们期望实现的功能。
  • 🟢 Green: 编写最精简的业务代码,让测试恰好通过。
  • 🔵 Refactor: 在测试持续通过的前提下,重构优化代码的设计和质量。

借助测试驱动开发(TDD)思想,我们先为AI提供一份清晰、无歧义的“需求说明书”和“验收标准”,然后指导它进行代码的生成。这个过程的核心是“🔴 红-🟢 绿-🔵 重构”循环,它将我们的每一次的对话,都转化为一次可验证的、可累加的进步。采用“先验证,后实现”的红-绿-重构循环,将模糊的需求转化为精确的代码语言。

4.3 案例:优惠券使用规则引擎的复杂逻辑

业务需求:开发一个智能优惠券使用规则引擎,支持”多券叠加使用和最优组合推荐”

传统困难

  • 自然语言描述:“实现优惠券规则引擎,支持多种券类型的叠加使用,并智能推荐最优使用方案”
  • AI需要猜测:哪些券可以叠加?什么是“最优”?有哪些使用限制?
  • 反复沟通:多次修改提示词,AI理解仍然偏离业务实际

第一次尝试:AI理解为“简单累加所有优惠”

// AI第一次实现 - 过于简化
public BigDecimal calculateDiscount(Order order, List<Coupon> coupons) {
    return coupons.stream()
        .map(coupon -> coupon.getDiscountAmount())
        .reduce(BigDecimal.ZERO, BigDecimal::add);
}
// 问题:忽略了券的使用条件、互斥规则、叠加限制

第二次尝试:AI理解为“选择面额最大的券”

// AI第二次实现 - 逻辑错误
public List<Coupon> selectOptimalCoupons(Order order, List<Coupon> availableCoupons) {
    return availableCoupons.stream()
        .filter(coupon -> order.getTotalAmount().compareTo(coupon.getMinOrderAmount()) >= 0)
        .max(Comparator.comparing(Coupon::getDiscountAmount))
        .map(List::of)
        .orElse(Collections.emptyList());
}
// 问题:只考虑单券最大优惠,未考虑多券组合的更优效果

第三次尝试:AI尝试复杂逻辑但引入更多问题

// AI第三次实现 - 逻辑混乱
public CouponUsageResult applyCoupons(Order order, List<Coupon> coupons) {
    // 各种复杂的if-else嵌套,但缺乏清晰的业务规则
    // 没有处理券的互斥关系
    // 没有考虑计算顺序对最终优惠的影响
    // 边界条件处理不当
}

经过多轮提示词优化,每次都需要重新解释复杂的业务规则,仍不满足预期。

TDD方式的完整循环:

🔴 红色阶段:用测试定义需求

编写测试用例,精确定义复杂的业务规则:

@Test
public void testCouponUsageWithBasicStackingRules() {
    // 构造订单:总价100元,包含数码产品
    Order order = new Order()
        .setTotalAmount(new BigDecimal("100.00"))
        .addItem("数码产品", new BigDecimal("100.00"));
    
    // 构造可用优惠券
    List<Coupon> availableCoupons = Arrays.asList(
        new Coupon().setType("满减券").setCondition("满50减10").setDiscountAmount(new BigDecimal("10")),
        new Coupon().setType("打折券").setCondition("数码类9折").setDiscountRate(new BigDecimal("0.9")),
        new Coupon().setType("免邮券").setCondition("免运费").setDiscountAmount(new BigDecimal("5"))
    );
    
    // 期望结果:满减券和免邮券可叠加,打折券与满减券互斥,应选择最优组合
    CouponUsageResult result = CouponEngine.calculateOptimalUsage(order, availableCoupons);
    
    // 验证最优方案:使用打折券+免邮券 (90+0=90元,比满减券+免邮券的85元更优)
    assertEquals(2, result.getUsedCoupons().size());
    assertTrue(result.getUsedCoupons().stream().anyMatch(c -> "打折券".equals(c.getType())));
    assertTrue(result.getUsedCoupons().stream().anyMatch(c -> "免邮券".equals(c.getType())));
    assertEquals(new BigDecimal("95.00"), result.getFinalAmount()); // 100*0.9 + 0 - 5运费
}

@Test  
public void testCouponMutualExclusionRules() {
    Order order = new Order().setTotalAmount(new BigDecimal("200.00"));
    
    List<Coupon> availableCoupons = Arrays.asList(
        new Coupon().setType("满减券").setCondition("满100减30").setDiscountAmount(new BigDecimal("30")),
        new Coupon().setType("打折券").setCondition("全场8折").setDiscountRate(new BigDecimal("0.8")),
        new Coupon().setType("新用户专享").setCondition("首单5折").setDiscountRate(new BigDecimal("0.5"))
    );
    
    CouponUsageResult result = CouponEngine.calculateOptimalUsage(order, availableCoupons);
    
    // 验证互斥规则:新用户券与其他券互斥,且优惠最大,应该单独使用
    assertEquals(1, result.getUsedCoupons().size());
    assertEquals("新用户专享", result.getUsedCoupons().get(0).getType());
    assertEquals(new BigDecimal("100.00"), result.getFinalAmount()); // 200 * 0.5
}

@Test
public void testCouponUsageConditionValidation() {
    Order order = new Order()
        .setTotalAmount(new BigDecimal("30.00"))
        .setUserLevel("普通用户")
        .addItem("服装", new BigDecimal("30.00"));
    
    List<Coupon> availableCoupons = Arrays.asList(
        new Coupon().setCondition("满50减10"), // 不满足金额条件
        new Coupon().setCondition("VIP专享9折"), // 不满足用户等级条件  
        new Coupon().setCondition("数码类8折"), // 不满足品类条件
        new Coupon().setCondition("无门槛5元券").setDiscountAmount(new BigDecimal("5")) // 满足条件
    );
    
    CouponUsageResult result = CouponEngine.calculateOptimalUsage(order, availableCoupons);
    
    // 验证条件判断:只有无门槛券可用
    assertEquals(1, result.getUsedCoupons().size());
    assertEquals("无门槛5元券", result.getUsedCoupons().get(0).getCondition());
    assertEquals(new BigDecimal("25.00"), result.getFinalAmount());
}

运行测试:

[ERROR] testCouponUsageWithBasicStackingRules: 
Class CouponEngine doesn't exist
[ERROR] testCouponMutualExclusionRules:
Method calculateOptimalUsage not found

红色阶段成功:测试失败,但复杂的业务规则已经通过代码精确表达。

🟢 绿色阶段:AI精确实现

将失败的测试用例提供给AI:“请实现CouponEngine.calculateOptimalUsage方法,让这些测试通过”

AI分析测试用例后,精确理解了复杂的业务规则:

  1. 券类型包括满减券、打折券、免邮券、新用户券等
  2. 不同券有不同的叠加和互斥规则
  3. 需要验证使用条件(金额、用户等级、商品类别)
  4. 要计算最优的券使用组合

AI生成的实现方案:

public class CouponEngine {
    public static CouponUsageResult calculateOptimalUsage(Order order, List<Coupon> availableCoupons) {
        // 1. 过滤出满足使用条件的券
        List<Coupon> eligibleCoupons = availableCoupons.stream()
            .filter(coupon -> isEligible(order, coupon))
            .collect(Collectors.toList());
        
        // 2. 生成所有可能的券组合(考虑互斥规则)
        List<List<Coupon>> validCombinations = generateValidCombinations(eligibleCoupons);
        
        // 3. 计算每种组合的最终优惠,选择最优方案
        return validCombinations.stream()
            .map(combination -> calculateResult(order, combination))
            .min(Comparator.comparing(CouponUsageResult::getFinalAmount))
            .orElse(new CouponUsageResult(order.getTotalAmount(), Collections.emptyList()));
    }
    
    private static boolean isEligible(Order order, Coupon coupon) {
        String condition = coupon.getCondition();
        
        // 金额条件检查
        if (condition.contains("满") && condition.contains("减")) {
            BigDecimal minAmount = extractMinAmount(condition);
            if (order.getTotalAmount().compareTo(minAmount) < 0) return false;
        }
        
        // 用户等级检查
        if (condition.contains("VIP") && !"VIP用户".equals(order.getUserLevel())) {
            return false;
        }
        
        // 商品类别检查
        if (condition.contains("数码类") && !order.hasCategory("数码产品")) {
            return false;
        }
        
        return true;
    }
    
    private static List<List<Coupon>> generateValidCombinations(List<Coupon> coupons) {
        List<List<Coupon>> combinations = new ArrayList<>();
        
        // 生成所有子集组合
        for (int i = 0; i < Math.pow(2, coupons.size()); i++) {
            List<Coupon> combination = new ArrayList<>();
            for (int j = 0; j < coupons.size(); j++) {
                if ((i & (1 << j)) != 0) {
                    combination.add(coupons.get(j));
                }
            }
            
            // 检查组合是否符合互斥规则
            if (isValidCombination(combination)) {
                combinations.add(combination);
            }
        }
        
        return combinations;
    }
    
    private static boolean isValidCombination(List<Coupon> combination) {
        // 检查互斥规则
        boolean hasDiscountCoupon = combination.stream().anyMatch(c -> "打折券".equals(c.getType()));
        boolean hasReduceCoupon = combination.stream().anyMatch(c -> "满减券".equals(c.getType()));
        boolean hasNewUserCoupon = combination.stream().anyMatch(c -> "新用户专享".equals(c.getType()));
        
        // 新用户券与其他券互斥
        if (hasNewUserCoupon && combination.size() > 1) {
            return false;
        }
        
        // 打折券与满减券互斥
        if (hasDiscountCoupon && hasReduceCoupon) {
            return false;
        }
        
        return true;
    }
    
    private static CouponUsageResult calculateResult(Order order, List<Coupon> usedCoupons) {
        BigDecimal finalAmount = order.getTotalAmount();
        
        // 按照优先级顺序应用优惠
        for (Coupon coupon : usedCoupons) {
            if ("打折券".equals(coupon.getType())) {
                finalAmount = finalAmount.multiply(coupon.getDiscountRate());
            } else if ("满减券".equals(coupon.getType()) || "无门槛5元券".equals(coupon.getCondition())) {
                finalAmount = finalAmount.subtract(coupon.getDiscountAmount());
            } else if ("免邮券".equals(coupon.getType())) {
                // 免邮券单独处理运费
                finalAmount = finalAmount.subtract(coupon.getDiscountAmount());
            }
        }
        
        return new CouponUsageResult(finalAmount, usedCoupons);
    }
}

运行测试:

[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO] All tests passed!

绿色阶段成功:测试通过,复杂的业务逻辑完全正确。

🔵 重构阶段:优化代码质量

在测试保护下,对AI生成的代码进行质量优化:

public class CouponEngine {
    // 提取常量,增强可读性
    private static final Set<String> MUTUALLY_EXCLUSIVE_TYPES = Set.of("打折券", "满减券");
    private static final String NEW_USER_COUPON_TYPE = "新用户专享";
    
    public static CouponUsageResult calculateOptimalUsage(Order order, List<Coupon> availableCoupons) {
        if (CollectionUtils.isEmpty(availableCoupons)) {
            return new CouponUsageResult(order.getTotalAmount(), Collections.emptyList());
        }
        
        // 使用策略模式优化条件验证
        List<Coupon> eligibleCoupons = availableCoupons.stream()
            .filter(coupon -> CouponValidator.isEligible(order, coupon))
            .collect(Collectors.toList());
        
        // 使用组合算法优化券组合生成
        List<List<Coupon>> validCombinations = CouponCombinator.generateValidCombinations(eligibleCoupons);
        
        // 使用计算引擎优化折扣计算
        return validCombinations.stream()
            .map(combination -> DiscountCalculator.calculateResult(order, combination))
            .min(Comparator.comparing(CouponUsageResult::getFinalAmount))
            .orElse(new CouponUsageResult(order.getTotalAmount(), Collections.emptyList()));
    }
}

// 职责分离:券验证器
class CouponValidator {
    public static boolean isEligible(Order order, Coupon coupon) {
        return AmountValidator.validate(order, coupon) &&
               UserLevelValidator.validate(order, coupon) &&
               CategoryValidator.validate(order, coupon);
    }
}

// 职责分离:券组合器
class CouponCombinator {
    public static List<List<Coupon>> generateValidCombinations(List<Coupon> coupons) {
        return PowerSetGenerator.generate(coupons).stream()
            .filter(MutualExclusionChecker::isValidCombination)
            .collect(Collectors.toList());
    }
}

// 职责分离:折扣计算器
class DiscountCalculator {
    public static CouponUsageResult calculateResult(Order order, List<Coupon> usedCoupons) {
        // 按优先级排序券,确保计算顺序正确
        List<Coupon> sortedCoupons = usedCoupons.stream()
            .sorted(Comparator.comparing(CouponPriorityResolver::getPriority))
            .collect(Collectors.toList());
        
        BigDecimal finalAmount = order.getTotalAmount();
        
        for (Coupon coupon : sortedCoupons) {
            finalAmount = applyCouponDiscount(finalAmount, coupon);
        }
        
        return new CouponUsageResult(finalAmount, usedCoupons);
    }
    
    private static BigDecimal applyCouponDiscount(BigDecimal currentAmount, Coupon coupon) {
        return CouponTypeHandler.getHandler(coupon.getType())
            .applyDiscount(currentAmount, coupon);
    }
}

重构验证:

[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO] 重构完成,测试持续通过,代码结构更清晰,职责分离更明确

协作模式转变:开发者不再需要为如何描述复杂的业务规则而烦恼,现在只需专注于设计精确的测试场景——我们负责定义“做什么”和“预期结果”,而AI则负责实现具体的“怎么做”。这种明确的分工让复杂逻辑的开发变得既可控又高效。

通过这种方式,我们能够确保:

  1. 需求表达精准无歧义
  2. 边界条件全面覆盖
  3. 实现过程完全可控
  4. 重构过程安全可靠

当需要开发新场景时,只需新增测试用例即可,完全不必担心会破坏原有逻辑。这种开发模式不仅提升了效率,更确保了系统的稳定性和可维护性。

五、实践要点

5.1 环境配置

确保AI Agent能执行mvn test命令

设定明确的行为准则(Rule),让AI能够知道我们现在遵循的开发范式,防止AI为了通过测试”作弊”修改业务代码。一个借助TDD思想驱动代码生成的执行准则如下

# AI Agent 行为准则:TDD 测试驱动开发

## 1. 总则

### 1.1. 概述
为了确保 AI Agent 遵循 TDD(测试驱动开发)的开发模式,Agent 必须严格按照 **Red-Green-Refactor** 三个阶段的循环进行开发。在执行每个阶段前,Agent 必须向开发者明确声明其当前所处的阶段。

本准则旨在确保 Agent 遵循正确的 TDD 开发流程,避免跳过关键步骤。

### 1.2. 环境配置:强制使用指定的 settings.xml
**核心要求**: 所有对 `mvn@ 命令的调用(如 mvn test@, mvn compile@ 等),都**必须**使用 --settings@ (或 -s@) 参数来指定一个自定义的 settings.xml` 文件,以确保能够访问内部的 Maven 仓库。

- **命令格式示例**: `mvn --settings [settings.xml的绝对路径] test`
- **`settings.xml` 文件路径**: `[settings.xml的绝对路径]`

Agent 在执行任何 Maven 命令前,必须确认此路径已被正确配置和使用。

---

## 2. TDD 三阶段循环

### 2.1. 第一阶段:RED (写失败的测试)

#### 2.1.1. 目标
编写一个**必然失败**的测试用例,明确定义即将实现的功能需求。

#### 2.1.2. 核心准则
- **允许**: Agent 可以在 `src/test/` 目录下创建新的测试文件或添加新的测试方法
- **要求**:
  - 测试必须是失败的(因为对应的实现代码尚未存在或不完整)
  - 一次只测试一个功能点
  - 测试代码要简单清晰
  - 测试名称要明确表达测试意图
- **禁止**: Agent **不能**修改 `src/main/` 目录下的任何现有代码
- **验证**: 运行测试必须显示红色(失败状态)

#### 2.1.3. 交互示例
- **开发者提示**: "我需要实现一个计算器的加法功能"
- **Agent 回应**: "已激活 **RED 阶段**。我将先编写一个失败的测试用例来定义加法功能的需求。"

### 2.2. 第二阶段:GREEN (让测试通过的最简实现)

#### 2.2.1. 目标
编写**最简单**的实现代码,让当前失败的测试通过。

#### 2.2.2. 核心准则
- **允许**: Agent 可以创建、修改 `src/main/` 目录下的代码
- **要求**:
  - 优先考虑最简单的实现方式
  - 专注于满足当前测试用例
  - 快速实现功能让测试通过
- **禁止**:
  - **不能**修改测试代码
  - **不考虑**代码质量和性能优化
  - **不进行**过度设计
- **验证**: 运行测试必须显示绿色(通过状态)

#### 2.2.3. 交互示例
- **Agent 回应**: "已激活 **GREEN 阶段**。我将实现最简单的代码来让刚才的测试通过,不考虑优化和设计。"

### 2.3. 第三阶段:REFACTOR (重构优化)

#### 2.3.1. 目标
在保持测试通过的前提下,改进代码的设计、质量和可维护性。

#### 2.3.2. 核心准则
- **允许**: Agent 可以重构 `src/main/` 目录下的实现代码
- **要求**:
  - 改进代码设计和质量
  - 消除重复代码
  - 提高代码可读性和可维护性
  - 每次重构后必须运行测试确保通过
- **禁止**:
  - **不能**修改测试的行为和期望
  - **不能**破坏现有功能
- **验证**: 重构过程中和完成后,所有测试必须保持绿色

#### 2.3.3. 交互示例
- **Agent 回应**: "已激活 **REFACTOR 阶段**。我将重构代码以提高质量,同时确保所有测试保持通过状态。"

---

## 3. TDD 最佳实践

### 3.1. 循环节奏
- **小步快走**: 每个 Red-Green-Refactor 循环应该很短(几分钟到十几分钟)
- **频繁验证**: 每个阶段完成后都要运行测试验证
- **逐步推进**: 一次只关注一个小功能点

### 3.2. 测试质量要求
- **快速执行**: 单元测试应该在秒级内完成
- **独立性**: 测试之间不应该有依赖关系
- **可重复性**: 测试结果应该是确定的和可重复的
- **清晰命名**: 测试方法名应明确表达测试意图

### 3.3. 代码质量保证
- **持续重构**: 在每个循环的 REFACTOR 阶段改进代码
- **消除重复**: 遵循 DRY(Don't Repeat Yourself)原则
- **保持简洁**: 代码应该简洁明了,易于理解

### 3.4. 流程控制
Agent 在每个阶段转换时,必须:
1. 明确声明即将进入的阶段
2. 说明当前阶段的具体目标
3. 完成阶段后验证结果
4. 确认是否继续下一个循环

5.2 掌握单测语法

AI擅长基础用例覆盖,但复杂业务场景、边界条件仍有可能需要开发者手动编写。不要完全依赖AI构造用例。

5.3 选择合适场景与策略

快速决策法则:

  • 简单功能:单个方法,逻辑直观,采用“先实现,后验证”;
  • 复杂业务逻辑:多分支判断、算法计算、状态转换,采用TDD“先验证,后实现”;
  • 存量代码修改:采用“安全网保护”策略;
  • 提示词难以描述需求时:测试用例是最好的需求文档,采用TDD让代码直接表达需求。

5.4 持续维护

单元测试必须与业务代码演进保持同步。一个过时的、无人维护的测试集,其价值会迅速归零,甚至成为负资产。

六、结语

如今,单元测试已被赋予全新的意义——它不再被视为一种“开发负担”,而是进化成为AI Coding时代的“质量引擎”。

我们构建起三重关键保障:

  • 策略一:以客观检验替代主观判断,让AI代码告别“看起来没问题”的错觉;
  • 策略二:为存量代码筑起防护墙,使修改存量代码安全可控,降低演进风险;
  • 策略三:用测试作为与AI的沟通语言,精准传递复杂需求与预期。

更深层次的变化在于,我们正在重新定义开发者的核心价值:当我们从“思考提示词”转向“思考测试用例”,本质上是从AI代码被动的审查者,转变为了主动的需求设计者与质量掌控者。这不仅加速开发进程,更显著提升代码质量。这正是AI时代中,开发者与智能工具协同进化的优秀范式。

为了提高其软件系统的合规覆盖率,Meta 已经将大型语言模型应用于变异测试。这种方法将 LLM 生成的变异体(mutants)和测试集成到 Meta 的自动化合规加固(ACH)系统中,消除了传统变异测试在可扩展性和准确性方面的限制。该系统的目标是在满足合规义务的同时保持产品和服务的安全,帮助团队更高效地满足全球监管要求。

 

变异测试是故意在代码中引入一些小的变异体,并检查测试是否能够检测到它们,以此来评估测试套件的有效性。由于变异体数量过多、计算成本高昂且存在价值有限的等效变异体等因素,传统编译测试的应用有限。Meta 的方法是利用大型语言模型生成具备上下文感知能力的变异体以及对应的测试,从而降低噪声并使工程工作聚焦于高价值代码路径。

 

在没有 LLM 指引之前,变异测试依赖于基于规则的静态操作符。这些操作符会无差别地生成大量的变异体,其中许多在语义上与原始代码等价,压跨了测试基础设施和开发流程。

 

Meta的ACH系统使用 LLM 生成恰当的变异体和有针对性的测试,重点关注隐私、安全和监管问题。基于LLM的等价检测器会过滤掉多余的变异体,而测试生成器会生成单元测试,工程师可以进行审查但不需要手动编写,这显著降低了运营开销。Facebook、Instagram、WhatsApp 和 Meta 的可穿戴平台的早期部署产生了数万个变异体和数百个可执行的测试。

ACH 系统架构概览(图片来源:Meta技术博客

 

自从将研究成果纳入 ACH 以来,Meta 在FSE 2025EuroSTAR 2025大会上展示了他们的工作成果,即 LLM 如何帮助他们克服以前限制大规模变异测试的障碍。借助生成式 AI 更高效地生成测试用例,传统上用于评估测试质量的变异测试如今变得更具实用性和可扩展性。

 

正如 Meta 工程团队所强调的那样:

 

从 2024 年 10 月到 12 月,我们尝试在 Facebook、Instagram、WhatsApp 和 Meta 的可穿戴平台上部署了用于隐私测试的 ACH。在数千个变异体和生成的数百个测试中,隐私工程师接受了 73%的测试,其中 36%被判定为与隐私相关。

 

在 ACH 的基础上,Meta 推出了即时捕获测试(JiTTest)挑战赛,旨在探索大型语言模型在自动化软件测试中的应用。该系统会生成强化测试以防止回归问题,并生成捕获测试用于检测新代码或变更代码中的缺陷。它会在拉取请求进入生产环境前生成测试结果以供审核,这既解决了测试预言问题(Test Oracle Problem),又保留了人工监督环节。在 FSE 2025 大会上,Meta 发表了一篇论文,详细阐述了 JiTTest 挑战及其相关的开放研究课题。

 

Meta 表示,LLM 将耗时且容易出错的过程转变为更高效的系统,帮助简化并优化了合规和风险管理框架。正在进行的工作包括:将 ACH 扩展到隐私测试和 Kotlin 之外的更多领域和语言;通过微调和提示工程改进变异体生成;解决测试预言问题。Meta 还在研究开发人员如何与 LLM 生成的测试互动,以提升采用率和可用性。更多研究成果将在即将召开的会议中展示,包括Product@Scale

 

https://www.infoq.com/news/2026/01/meta-llm-mutation-testing/

整理 | 华卫

 

过去一周,工程界正反复剖析 Boris Cherny 在社交平台 X 上发布的一系列推文。Boris Cherny 是 Anthropic 公司 Claude Code 的创始人兼负责人,起初他只是随手分享个人终端配置,这条内容却演变成了一篇引爆行业的软件开发未来宣言,业内人士称其为这家初创公司发展历程中的 “分水岭时刻”。

 

开发者社区知名人士 Jeff Tang 写道:“作为程序员,若你还没研读这位 Claude Code 缔造者亲授的最佳实践,就已经落后了。” 另一位行业观察人士 Kyle McNease 则更进一步表示,凭借 Cherny 这次 “颠覆游戏规则的更新”,Anthropic 正 “火力全开”,或将迎来属于自己的 “ChatGPT 时刻”。

同时开 5 个智能体,把编程玩成了战略游戏

这份狂热源于一个看似矛盾的事实:Cherny 这套工作流异常简洁,却能让一名工程师发挥出小型工程团队的产出效能。有用户在 X 平台分享了实践这套工作流的感受,称其体验 “更像玩《星际争霸》”,而非传统编程。核心变化在于,开发者从敲代码语法转变为指挥自主协作的智能单元。

 

在 Cherny 的分享中,最令人震撼的一点在于:他从不采用线性模式编程。传统开发的 “核心循环” 流程是程序员编写一个函数、完成测试,再推进下一项任务。而 Cherny 却扮演着一支舰队指挥官的角色。

 

“我在终端里并行运行五个 Claude 智能体,把标签页按 1 到 5 编号,借助系统通知来判断哪个 Claude 需要输入指令。”Cherny 写道。通过调用 iTerm2 系统通知功能,Cherny 得以高效管理五条并行的工作流:一个智能体运行测试套件,另一个重构遗留模块,第三个则撰写文档。他还会在浏览器的 claude.ai 平台上同时运行 “5 到 10 个 Claude 智能体”,并通过 “传送” 指令,在网页端与本地设备之间切换任务会话。

 

Cherny 的另一个选择也出人意料:在这个极度追求低延迟的行业,他只使用 Anthropic 体量最大、运行速度最慢的模型 Opus 4.5。“我做任何任务都用开启思考模式的 Opus 4.5,它是我用过的最好用的编程模型。尽管它比 Sonnet 模型体积更大、速度更慢,但因为你需要对它的干预更少,而且它的工具调用能力更强,最终整体效率几乎总是比使用小型模型更高。”他解释道。

 

此外,Cherny 还详细介绍了他的团队如何解决 AI 的 “健忘症” 问题。标准的大语言模型在不同任务会话之间,无法 “记住” 某家公司特定的编程风格或架构决策。为解决这一难题,Cherny 的团队在代码仓库中维护着一个名为 CLAUDE.md 的共享文件。

 

“每当我们发现 Claude 执行任务出错,就把问题记录到 CLAUDE.md 里,这样 Claude 下次就不会再犯同样的错误了。” 他写道。这一做法让整个代码库变成了一个能够自我修正的有机体。当人类开发者审核代码合并请求、发现错误时,他们不仅会修正代码,还会标记相关问题,让 AI 更新自身的执行指令。

 

“每一次失误都能转化为一条规则。” 分析这一系列推文的产品负责人 Aakash Gupta 如此评价。

 Cherny 公开工作流:极致自动化改造重复性任务

业界对 Cherny 推文的热烈反响,标志着开发者对自身工作的认知正在发生关键性转变。长久以来,“AI 编程” 的定位不过是文本编辑器里的自动补全功能。而 Cherny 的实践证明,如今的 AI 已经可以成为驱动整个工作流程的 “操作系统”。

 

“我向 claude.ai/code 提交的每一处修改,都会通过 Claude 浏览器扩展程序完成自动测试。它会自动打开浏览器、测试用户界面,并反复迭代优化,直到代码正常运行,且用户体验达到预期标准。”Cherny 认为,无论是通过浏览器自动化、运行 Shell 命令,还是执行测试套件,赋予 AI 自主验证工作成果的能力能将最终产出质量提升 2~3 倍。智能体不只是编写代码,更能主动验证代码的有效性。

 

Cherny 借助斜杠指令(即已纳入项目代码库的自定义快捷指令),只需一次按键就能触发复杂操作流程。他重点介绍了一条名为 /commit-push-pr 的指令,这条指令他每天要调用数十次。无需手动输入 Git 命令、撰写提交说明、发起代码合并请求,智能体便能自主完成版本控制中的各项繁琐流程。

 

Cherny 还部署了子智能体来负责开发生命周期中的特定阶段。在核心开发工作完成后,他会启用代码简化智能体来优化架构;在正式发布任何内容之前,则会调用应用验证智能体执行端到端测试。

 

“如果你已是一名工程师,且渴望获得更强大的工作效能,一定要读读这篇分享。”Jeff Tang 在社交平台 X 上如此总结。还有观察人士盛赞,这套 “原生” 工作流的核心优势,在于将重复性任务进行了极致的自动化改造。

 

参考链接:

https://venturebeat.com/technology/the-creator-of-claude-code-just-revealed-his-workflow-and-developers-are

随着 Spring Boot 在微服务架构中的广泛应用,其暴露的安全漏洞也呈指数级增长。传统的手工测试在面对成百上千个端点时显得力不从心,而自动化工具往往缺乏对 Spring Boot 特有漏洞的深度支持。

SpringBootVul-GUI 的出现,填补了 Spring Boot 专项安全工具的空白,成为红队工程师和安全研究人员的效率倍增神器!

简介

SpringBootVul-GUI 是一款半自动化 Spring Boot 漏洞检测与利用工具,内置目前 Spring Boot 所有漏洞。它以图形化界面为核心,集成了目前主流的 Spring Boot 漏洞检测模块,实现了从信息收集到漏洞利用的一站式工作流。

关键特性:


📌 转载信息
原作者:
moximoxi
转载时间:
2026/1/5 13:05:17

项目介绍:

基于 Django、langgraph、langchain 开发的 AI 自动化测试平台。

目前功能:

1、需求文档智能评审并指正需要改正的问题。
2、AI 根据知识库和需求文档生成测试用例。
3、自然语言用例执行并生成对应的 playwright 自动化脚本。
4、执行用例时自动截图上传会平台。
5、批量执行功能测试用例和 playwright 脚本并生成对应的报告。

效果展示



如果项目对你有帮助,请帮我点点 star ~

项目地址: MGdaasLab/WHartTest: WHartTest 是基于 Django REST Framework 与现代大模型技术打造的 AI 驱动测试自动化平台。平台聚合自然语言理解、知识库检索与嵌入搜索能力,结合 LangChain 与 MCP(Model Context Protocol) 工具调用,实现从需求到可执行测试用例的自动化生成与管理,帮助测试团队提升效率与覆盖率。

声明:

本人纯编程小白,项目由本人提供思路,AI 负责功能代码实现。(也算是站在巨人的肩膀上了)


📌 转载信息
原作者:
duanxc
转载时间:
2025/12/30 18:07:51