标签 Playwright 下的文章

作者:辰泉

提示:本文是 AgentRun Browser Sandbox 快速上手实践指南的姊妹篇,专注于高级集成方案、生产环境的最佳实践、性能优化和部署策略。如果您还没有完成基础学习,请先阅读《快速上手:LangChain + AgentRun 浏览器沙箱极简集成指南》

前言

在完成了 Browser Sandbox 的基础集成之后,本文将介绍高级集成方案(如 BrowserUse 框架)以及生产环境部署需要考虑的因素:如何管理 Sandbox 生命周期?如何优化性能和成本?如何保证系统的安全性和可观测性?本文将为您提供全面的高级应用和生产环境最佳实践指南。

基于 BrowserUse 集成 Browser Sandbox

image

效果截图

BrowserUse 是一个专门为 AI Agent 设计的浏览器自动化框架,支持视觉理解和智能决策。通过 AgentRun Browser Sandbox,您可以让 BrowserUse 在云端运行,享受 Serverless 架构的优势。

BrowserUse 架构概览

下图展示了 BrowserUse 与 Browser Sandbox 的集成架构:

image

架构特点:

  1. 智能决策循环: Agent 通过 LLM 分析页面截图,基于视觉理解生成操作指令,执行操作后继续循环,直到任务完成
  2. 无头浏览器控制: 通过 CDP 协议远程控制云端浏览器,Playwright 作为底层驱动,所有操作在云端执行
  3. 实时可视化: VNC 提供实时画面监控,方便调试和验证 Agent 行为

快速开始

安装依赖

pip install browser-use python-dotenv agentrun-sdk[playwright,server]

主要依赖说明:

  • browser-use:BrowserUse 核心库,支持多模态 LLM
  • agentrun-sdk[playwright,server]:AgentRun SDK,用于创建 Sandbox
  • python-dotenv:环境变量管理

配置环境变量

创建 .env 文件:

# DashScope API Key(用于 Qwen 模型)
DASHSCOPE_API_KEY=sk-your-dashscope-api-key
# AgentRun 认证信息
AGENTRUN_ACCOUNT_ID=your-account-id
ALIBABA_CLOUD_ACCESS_KEY_ID=your-access-key-id
ALIBABA_CLOUD_ACCESS_KEY_SECRET=your-access-key-secret
# Browser Sandbox 模板名称
BROWSER_TEMPLATE_NAME=sandbox-browser-demo

创建 Sandbox 并使用 BrowserUse

import asyncio
import os
from agentrun.sandbox import Sandbox, TemplateType
from browser_use import Agent, BrowserSession, ChatOpenAI
from browser_use.browser import BrowserProfile
from dotenv import load_dotenv
load_dotenv()
async def main():
    # 创建 Browser Sandbox
    sandbox = Sandbox.create(
        template_type=TemplateType.BROWSER,
        template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
        sandbox_idle_timeout_seconds=3000
    )
    # 配置 Qwen 多模态模型
    llm = ChatOpenAI(
        model='qwen-vl-max',
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
    )
    # 创建浏览器会话
    browser_session = BrowserSession(
        cdp_url=sandbox.get_cdp_url(),
        browser_profile=BrowserProfile(
            headless=False,
            timeout=3000000,
            keep_alive=True
        )
    )
    # 创建 Agent 并执行任务
    agent = Agent(
        task="访问阿里云官网并总结主要产品分类",
        llm=llm,
        browser_session=browser_session,
        use_vision=True
    )
    result = await agent.run()
    print(f"任务结果: {result.final_result()}")
    # 清理资源
    await browser_session.stop()
    sandbox.delete()
if __name__ == "__main__":
    asyncio.run(main())

BrowserUse 高级配置

自定义浏览器行为

browser_profile = BrowserProfile(
    timeout=3000000,             # 超时时间(毫秒)
    keep_alive=True,             # 保持会话活跃
)

多步骤任务编排

async def complex_task():
    """复杂的多步骤任务"""
    sandbox = Sandbox.create(
        template_type=TemplateType.BROWSER,
        template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
        sandbox_idle_timeout_seconds=3000
    )
    llm = ChatOpenAI(
        model='qwen-vl-max',
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
    )
    browser_session = BrowserSession(
        cdp_url=sandbox.cdp_url,
        browser_profile=BrowserProfile(keep_alive=True)
    )
    # 任务 1:信息收集
    agent1 = Agent(
        task="访问阿里云官网,收集产品分类信息",
        llm=llm,
        browser_session=browser_session,
        use_vision=True
    )
    result1 = await agent1.run()
    # 任务 2:基于第一步结果继续操作
    agent2 = Agent(
        task=f"基于以下信息:{result1.final_result()},访问每个产品分类并提取关键特性",
        llm=llm,
        browser_session=browser_session,
        use_vision=True
    )
    result2 = await agent2.run()
    # 清理资源
    await browser_session.stop()
    sandbox.delete()
    return result2.final_result()

集成 VNC 实时监控

import webbrowser
import urllib.parse
async def run_with_vnc_monitoring():
    """运行 BrowserUse 并启用 VNC 监控"""
    sandbox = Sandbox.create(
        template_type=TemplateType.BROWSER,
        template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
        sandbox_idle_timeout_seconds=3000
    )
    # 获取 VNC URL 并打开查看器
    vnc_url = sandbox.get_vnc_url(),
    if vnc_url:
        # 修复 VNC URL 路径
        if vnc_url.endswith('/vnc'):
            vnc_url = vnc_url[:-4] + '/ws/livestream'
        # 在浏览器中打开 VNC 查看器
        encoded_url = urllib.parse.quote(vnc_url, safe='')
        viewer_url = f"file://path/to/vnc-viewer.html?url={encoded_url}"
        webbrowser.open(viewer_url)
        print(f"VNC 查看器已打开,可实时监控浏览器操作")
    # 创建并运行 Agent
    llm = ChatOpenAI(
        model='qwen-vl-max',
        api_key=os.getenv("DASHSCOPE_API_KEY"),
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
    )
    browser_session = BrowserSession(
        cdp_url=sandbox.get_cdp_url(),
        browser_profile=BrowserProfile(headless=False, keep_alive=True)
    )
    agent = Agent(
        task="访问淘宝首页并搜索商品",
        llm=llm,
        browser_session=browser_session,
        use_vision=True
    )
    result = await agent.run()
    # 清理资源
    await browser_session.stop()
    sandbox.delete()
    return result.final_result()

BrowserUse 最佳实践

  1. 启用视觉理解: 对于复杂页面,使用 use_vision=True 让 LLM 分析页面截图
  2. 保持会话活跃: 使用 keep_alive=True 避免频繁重建连接
  3. 合理设置超时: 根据任务复杂度调整 timeout 参数
  4. 复用 BrowserSession: 对于多步骤任务,复用同一个 BrowserSession 提高效率
  5. 结合 VNC 调试: 开发阶段启用 VNC 实时查看 Agent 行为

获取完整示例代码

本文中的所有示例代码都可以在以下仓库中找到:

# 克隆示例代码仓库
git clone https://github.com/devsapp/agentrun-sandbox-demos.git
# 进入项目目录
cd agentrun-browseruse-wth-sandbox-demo
# 安装依赖(注意需要安装 server 扩展)
pip install -r requirements.txt

配置环境变量

# 复制环境变量模板
cp env.example .env
# 编辑 .env 文件,填入您的配置信息
# 必需配置项:
# - DASHSCOPE_API_KEY: DashScope API Key(用于 Qwen 模型)
# - AGENTRUN_ACCOUNT_ID: AgentRun 账号 ID
# - ALIBABA_CLOUD_ACCESS_KEY_ID: 阿里云访问密钥 ID
# - ALIBABA_CLOUD_ACCESS_KEY_SECRET: 阿里云访问密钥 Secret
# - BROWSER_TEMPLATE_NAME: Browser Sandbox 模板名称

运行示例(两步运行设计)

本项目采用服务器-客户端的架构设计,需要分两步运行:

第一步:启动 VNC 查看器服务

# 在终端 1 中启动 VNC Web 服务器
python main.py
# 服务启动后会显示:
# VNC 查看器服务已启动: http://localhost:8000
# 访问 http://localhost:8000 可以实时查看浏览器操作

main.py 的作用:

  • 启动本地 Web 服务器,提供 VNC 实时查看界面
  • 提供 WebSocket 代理,连接 AgentRun Sandbox 的 VNC 服务
  • 允许您在浏览器中实时监控 Agent 的操作过程

第二步:运行 BrowserUse 示例

# 在终端 2 中运行示例代码
python examples/01_browseruse_basic.py
# 运行高级示例
python examples/02_browseruse_advanced.py

为什么需要两步运行?

  1. 实时监控: main.py 提供 VNC 查看器,可以实时看到 Agent 在浏览器中的操作
  2. 调试友好: 通过可视化界面,更容易理解 Agent 的决策过程和行为
  3. 服务解耦: VNC 服务和业务逻辑分离,可以同时运行多个示例而共用同一个查看器

运行流程图:

image

仓库内容包括:

  • main.py:VNC Web 服务器,用于实时监控
  • examples/01_browseruse_basic.py:基础集成示例
  • examples/02_browseruse_advanced.py:高级配置示例
  • examples/sandbox_manager.py:Sandbox 生命周期管理
  • vncviewer/:VNC 查看器前端和后端代码
  • 完整的环境配置和最佳实践代码

Sandbox 生命周期管理最佳实践

三种管理模式

根据不同的应用场景,我们推荐三种 Sandbox 管理模式:

image

方案对比:

image

单例模式实现

适合开发调试和多轮对话场景:

class SandboxManager:
    """单例模式 Sandbox 管理器"""
    _instance = None
    _sandbox = None
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance
    def get_or_create(self):
        """获取或创建 Sandbox"""
        if self._sandbox is None:
            self._sandbox = Sandbox.create(
                template_type=TemplateType.BROWSER,
                template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
                sandbox_idle_timeout_seconds=3000
            )
        return self._sandbox
    def destroy(self):
        """销毁 Sandbox"""
        if self._sandbox:
            self._sandbox.delete()
            self._sandbox = None
# 使用
manager = SandboxManager()
sandbox = manager.get_or_create()  # 首次创建
sandbox = manager.get_or_create()  # 复用现有实例

连接池模式实现

适合高并发生产环境:

from queue import Queue
from threading import Lock
class SandboxPool:
    """Sandbox 连接池"""
    def __init__(self, pool_size=5, max_idle_time=300):
        self.pool_size = pool_size
        self.max_idle_time = max_idle_time
        self.pool = Queue(maxsize=pool_size)
        self.lock = Lock()
        self._initialize_pool()
    def _initialize_pool(self):
        """初始化连接池"""
        for _ in range(self.pool_size):
            sandbox = self._create_sandbox()
            self.pool.put(sandbox)
    def _create_sandbox(self):
        """创建 Sandbox 实例"""
        return Sandbox.create(
            template_type=TemplateType.BROWSER,
            template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
            sandbox_idle_timeout_seconds=self.max_idle_time
        )
    def acquire(self, timeout=30):
        """获取 Sandbox 实例"""
        try:
            sandbox = self.pool.get(timeout=timeout)
            if not self._is_alive(sandbox):
                sandbox = self._create_sandbox()
            return sandbox
        except:
            raise RuntimeError("获取 Sandbox 超时")
    def release(self, sandbox):
        """归还 Sandbox 实例"""
        if self._is_alive(sandbox):
            self.pool.put(sandbox)
        else:
            new_sandbox = self._create_sandbox()
            self.pool.put(new_sandbox)
    def _is_alive(self, sandbox):
        """检查 Sandbox 是否存活"""
        try:
            return hasattr(sandbox, 'sandbox_id')
        except:
            return False
# 使用
pool = SandboxPool(pool_size=5)
sandbox = pool.acquire()
try:
    # 使用 sandbox 执行任务
    pass
finally:
    pool.release(sandbox)

会话状态管理

支持多用户多会话场景:

import time
class SessionManager:
    """会话状态管理"""
    def __init__(self):
        self.sessions = {}  # session_id -> sandbox
    def create_session(self, session_id: str):
        """创建会话"""
        if session_id not in self.sessions:
            sandbox = Sandbox.create(
                template_type=TemplateType.BROWSER,
                template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
                sandbox_idle_timeout_seconds=1800
            )
            self.sessions[session_id] = {
                'sandbox': sandbox,
                'created_at': time.time(),
                'last_used': time.time()
            }
        return self.sessions[session_id]['sandbox']
    def get_session(self, session_id: str):
        """获取会话"""
        if session_id in self.sessions:
            session = self.sessions[session_id]
            session['last_used'] = time.time()
            return session['sandbox']
        return None
    def cleanup_expired_sessions(self, max_idle_time=1800):
        """清理过期会话"""
        current_time = time.time()
        expired_sessions = []
        for session_id, session in self.sessions.items():
            if current_time - session['last_used'] > max_idle_time:
                expired_sessions.append(session_id)
        for session_id in expired_sessions:
            self.destroy_session(session_id)
    def destroy_session(self, session_id: str):
        """销毁会话"""
        if session_id in self.sessions:
            self.sessions[session_id]['sandbox'].delete()
            del self.sessions[session_id]

性能优化

超时时间配置

合理设置超时时间是平衡性能和成本的关键:

# 开发环境(调试用)
sandbox = Sandbox.create(
    template_name="dev-template",
    sandbox_idle_timeout_seconds=7200  # 2 小时
)
# 生产环境(单次任务)
sandbox = Sandbox.create(
    template_name="prod-template",
    sandbox_idle_timeout_seconds=300  # 5 分钟
)
# 长时间任务
sandbox = Sandbox.create(
    template_name="long-task-template",
    sandbox_idle_timeout_seconds=10800  # 3 小时
)

超时策略推荐:

image

Sandbox 复用策略

class SmartSandboxManager:
    """智能 Sandbox 复用管理器"""
    def __init__(self):
        self.sandboxes = {}  # key -> sandbox
        self.usage_count = {}  # key -> count
    def get_sandbox(self, user_id: str, session_id: str):
        """获取或创建 Sandbox(支持复用)"""
        key = f"{user_id}:{session_id}"
        if key not in self.sandboxes:
            self.sandboxes[key] = Sandbox.create(
                template_type=TemplateType.BROWSER,
                template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
                sandbox_idle_timeout_seconds=1800
            )
            self.usage_count[key] = 0
        self.usage_count[key] += 1
        return self.sandboxes[key]
    def should_recreate(self, key: str, max_reuse=50):
        """判断是否需要重建(防止状态累积)"""
        return self.usage_count.get(key, 0) >= max_reuse
    def recreate_if_needed(self, key: str):
        """按需重建 Sandbox"""
        if self.should_recreate(key):
            if key in self.sandboxes:
                self.sandboxes[key].delete()
                del self.sandboxes[key]
                self.usage_count[key] = 0

错误处理和重试机制

使用 tenacity 库实现智能重试:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class SandboxError(Exception):
    """Sandbox 操作异常"""
    pass
@retry(
    retry=retry_if_exception_type(SandboxError),
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
def execute_with_retry(sandbox, operation):
    """带重试的操作执行"""
    try:
        return operation(sandbox)
    except ConnectionError:
        raise SandboxError("连接失败")
    except TimeoutError:
        raise SandboxError("操作超时")
    except Exception as e:
        print(f"操作失败: {e}")
        raise SandboxError(f"操作失败: {e}")
# 使用示例
def navigate_page(sandbox):
    with sync_playwright() as p:
        browser = p.chromium.connect_over_cdp(sandbox.cdp_url)
        page = browser.contexts[0].pages[0]
        page.goto("https://example.com", timeout=30000)
        return page.title()
result = execute_with_retry(sandbox, navigate_page)

安全性最佳实践

环境变量保护

import os
from dotenv import load_dotenv
load_dotenv()
# 验证必需的环境变量
required_vars = ["DASHSCOPE_API_KEY", "AGENTRUN_ACCOUNT_ID"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
    raise ValueError(f"缺少必需的环境变量: {', '.join(missing_vars)}")
# 敏感信息不要硬编码
API_KEY = os.getenv("DASHSCOPE_API_KEY")
ACCESS_KEY_ID = os.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
ACCESS_KEY_SECRET = os.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

URL 白名单

ALLOWED_DOMAINS = [
    'example.com',
    'aliyun.com',
    'alibaba.com'
]
def is_url_allowed(url: str) -> bool:
    """检查 URL 是否在白名单中"""
    from urllib.parse import urlparse
    domain = urlparse(url).netloc
    return any(allowed in domain for allowed in ALLOWED_DOMAINS)
def safe_navigate(page, url: str):
    """安全导航"""
    if not is_url_allowed(url):
        raise ValueError(f"URL 不在白名单中: {url}")
    page.goto(url)

日志脱敏

import re
def sanitize_log(log_text: str) -> str:
    """日志脱敏"""
    # 脱敏 API Key
    log_text = re.sub(r'sk-[a-zA-Z0-9]{20,}', 'sk-***', log_text)
    # 脱敏 Access Key
    log_text = re.sub(r'LTAI[a-zA-Z0-9]{12,}', 'LTAI***', log_text)
    # 脱敏密码
    log_text = re.sub(r'password["\s:=]+[^"\s,}]+', 'password: ***', log_text, flags=re.IGNORECASE)
    return log_text
# 使用
print(sanitize_log(f"使用 API Key: {API_KEY}"))

可观测性与监控

日志记录最佳实践

import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'sandbox_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
class MonitoredSandboxManager:
    """带监控的 Sandbox 管理器"""
    def create_sandbox(self, **kwargs):
        """创建 Sandbox(带日志)"""
        start_time = time.time()
        logger.info(f"开始创建 Sandbox: {kwargs}")
        try:
            sandbox = Sandbox.create(**kwargs)
            duration = time.time() - start_time
            logger.info(f"Sandbox 创建成功: {sandbox.sandbox_id}, 耗时: {duration:.2f}s")
            return sandbox
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"Sandbox 创建失败: {e}, 耗时: {duration:.2f}s")
            raise
    def execute_task(self, sandbox, task_name: str, operation):
        """执行任务(带日志)"""
        start_time = time.time()
        logger.info(f"开始执行任务: {task_name}, Sandbox: {sandbox.sandbox_id}")
        try:
            result = operation(sandbox)
            duration = time.time() - start_time
            logger.info(f"任务执行成功: {task_name}, 耗时: {duration:.2f}s")
            return result
        except Exception as e:
            duration = time.time() - start_time
            logger.error(f"任务执行失败: {task_name}, 错误: {e}, 耗时: {duration:.2f}s")
            raise

指标收集

from dataclasses import dataclass
from typing import Dict, List
import json
@dataclass
class SandboxMetrics:
    """Sandbox 指标"""
    sandbox_id: str
    create_time: float
    destroy_time: float = None
    total_requests: int = 0
    failed_requests: int = 0
    total_duration: float = 0.0
class MetricsCollector:
    """指标收集器"""
    def __init__(self):
        self.metrics: Dict[str, SandboxMetrics] = {}
    def record_creation(self, sandbox_id: str):
        """记录创建"""
        self.metrics[sandbox_id] = SandboxMetrics(
            sandbox_id=sandbox_id,
            create_time=time.time()
        )
    def record_request(self, sandbox_id: str, duration: float, success: bool):
        """记录请求"""
        if sandbox_id in self.metrics:
            metric = self.metrics[sandbox_id]
            metric.total_requests += 1
            metric.total_duration += duration
            if not success:
                metric.failed_requests += 1
    def record_destruction(self, sandbox_id: str):
        """记录销毁"""
        if sandbox_id in self.metrics:
            self.metrics[sandbox_id].destroy_time = time.time()
    def export_metrics(self, filepath: str):
        """导出指标"""
        metrics_data = [
            {
                'sandbox_id': m.sandbox_id,
                'create_time': m.create_time,
                'destroy_time': m.destroy_time,
                'total_requests': m.total_requests,
                'failed_requests': m.failed_requests,
                'success_rate': (m.total_requests - m.failed_requests) / m.total_requests if m.total_requests > 0 else 0,
                'avg_duration': m.total_duration / m.total_requests if m.total_requests > 0 else 0,
                'lifetime': m.destroy_time - m.create_time if m.destroy_time else time.time() - m.create_time
            }
            for m in self.metrics.values()
        ]
        with open(filepath, 'w') as f:
            json.dump(metrics_data, f, indent=2)
# 使用
collector = MetricsCollector()
collector.record_creation(sandbox.sandbox_id)
# ... 执行任务 ...
collector.export_metrics('metrics.json')

成本优化

按需创建与销毁

class CostOptimizedManager:
    """成本优化的管理器"""
    def __init__(self, idle_threshold=300):
        self.idle_threshold = idle_threshold
        self.sandboxes = {}
        self.last_used = {}
    def get_sandbox(self, key: str):
        """获取 Sandbox(懒加载)"""
        if key not in self.sandboxes:
            self.sandboxes[key] = Sandbox.create(
                template_type=TemplateType.BROWSER,
                template_name=os.getenv("BROWSER_TEMPLATE_NAME"),
                sandbox_idle_timeout_seconds=self.idle_threshold
            )
        self.last_used[key] = time.time()
        return self.sandboxes[key]
    def cleanup_idle(self):
        """清理闲置 Sandbox"""
        current_time = time.time()
        to_remove = []
        for key, last_time in self.last_used.items():
            if current_time - last_time > self.idle_threshold:
                to_remove.append(key)
        for key in to_remove:
            if key in self.sandboxes:
                self.sandboxes[key].delete()
                del self.sandboxes[key]
                del self.last_used[key]
                logger.info(f"清理闲置 Sandbox: {key}")

批量任务处理

async def batch_process_tasks(tasks: List[str], pool_size: int = 5):
    """批量处理任务(复用 Sandbox)"""
    pool = SandboxPool(pool_size=pool_size)
    results = []
    for task in tasks:
        sandbox = pool.acquire()
        try:
            # 处理任务
            result = await process_task(sandbox, task)
            results.append(result)
        finally:
            pool.release(sandbox)
    return results

生产环境部署

环境配置

开发环境 (.env.dev):

# 开发环境配置
BROWSER_TEMPLATE_NAME=dev-browser-template
SANDBOX_IDLE_TIMEOUT=7200
POOL_SIZE=2
LOG_LEVEL=DEBUG

生产环境 (.env.prod):

# 生产环境配置
BROWSER_TEMPLATE_NAME=prod-browser-template
SANDBOX_IDLE_TIMEOUT=300
POOL_SIZE=10
LOG_LEVEL=INFO
ENABLE_METRICS=true
METRICS_EXPORT_INTERVAL=300

高可用架构

image

健康检查

from flask import Flask, jsonify
app = Flask(__name__)
manager = SandboxManager()
@app.route('/health')
def health_check():
    """健康检查端点"""
    try:
        # 检查 Sandbox 是否可用
        sandbox = manager.get_or_create()
        # 简单的健康检查
        is_healthy = hasattr(sandbox, 'sandbox_id')
        if is_healthy:
            return jsonify({
                'status': 'healthy',
                'sandbox_id': sandbox.sandbox_id,
                'timestamp': time.time()
            }), 200
        else:
            return jsonify({
                'status': 'unhealthy',
                'error': 'Sandbox not available'
            }), 503
    except Exception as e:
        return jsonify({
            'status': 'unhealthy',
            'error': str(e)
        }), 503
@app.route('/metrics')
def metrics():
    """指标端点"""
    collector = MetricsCollector()
    # 返回当前指标
    return jsonify({
        'total_sandboxes': len(collector.metrics),
        'timestamp': time.time()
    })

故障排查与常见问题

连接问题

问题:无法连接到 Sandbox

排查步骤

def diagnose_connection(sandbox):
    """诊断连接问题"""
    print(f"1. 检查 Sandbox ID: {sandbox.sandbox_id}")
    print(f"2. 检查 CDP URL: {sandbox.cdp_url}")
    # 测试 CDP 连接
    try:
        with sync_playwright() as p:
            browser = p.chromium.connect_over_cdp(sandbox.cdp_url)
            print("✓ CDP 连接成功")
            browser.close()
    except Exception as e:
        print(f"✗ CDP 连接失败: {e}")
    # 测试 VNC 连接
    print(f"3. VNC URL: {sandbox.vnc_url}")
    print("提示: 可以在浏览器中打开 VNC URL 测试连接")

超时问题

问题:任务执行超时

解决方案

def handle_timeout(sandbox, operation, max_retries=3):
    """处理超时(带重试)"""
    for attempt in range(max_retries):
        try:
            return operation(sandbox, timeout=30000)
        except TimeoutError:
            logger.warning(f"任务超时(尝试 {attempt + 1}/{max_retries})")
            if attempt == max_retries - 1:
                # 最后一次尝试失败,重建 Sandbox
                logger.error("多次超时,重建 Sandbox")
                sandbox.delete()
                sandbox = Sandbox.create(
                    template_type=TemplateType.BROWSER,
                    template_name=os.getenv("BROWSER_TEMPLATE_NAME")
                )
                return operation(sandbox, timeout=60000)

性能问题

问题:响应速度慢

优化建议

  1. 使用连接池:预先创建多个 Sandbox 实例
  2. 启用 keep_alive:保持浏览器会话,避免重复建立连接
  3. 合理设置超时:根据任务复杂度调整超时时间
  4. 并发控制:限制并发请求数,避免资源竞争
# 性能优化配置示例
browser_session = BrowserSession(
    cdp_url=sandbox.cdp_url,
    browser_profile=BrowserProfile(
        timeout=30000,          # 30秒超时
        keep_alive=True,        # 保持连接
        disable_security=False  # 保持安全检查
    )
)

错误码参考

image

总结

通过本指南,您已经掌握了:

  1. BrowserUse 集成: 如何使用 BrowserUse 框架实现智能浏览器自动化
  2. 生命周期管理: 三种 Sandbox 管理模式的选择和实现
  3. 性能优化: 超时配置、复用策略、错误重试机制
  4. 安全实践: 环境变量保护、URL 白名单、日志脱敏
  5. 可观测性: 日志记录、指标收集、监控告警
  6. 成本优化: 按需创建、闲置清理、批量处理
  7. 生产部署: 高可用架构、健康检查、故障排查

关注「阿里云云原生」公众号,后台回复:BrowserUse

获取参考代码

立即体验函数计算 AgentRun

函数计算 AgentRun 的无代码到高代码演进能力,现已开放体验:

  1. 快速创建:访问控制台(https://functionai.console.aliyun.com/cn-hangzhou/agent/explore),60 秒创建你的第一个 Agent
  2. 深度定制:当需要更复杂功能时,一键转换为高代码
  3. 持续演进:利用函数计算 AgentRun 的基础设施能力,持续优化你的 Agent

从想法到上线,从原型到生产,函数计算 AgentRun 始终是你最好的伙伴。欢迎加入“函数计算 AgentRun 客户群”,钉钉群号: 134570017218

快速了解函数计算 AgentRun:

一句话介绍:函数计算 AgentRun 是一个以高代码为核心的一站式 Agentic AI 基础设施平台。秉持生态开放和灵活组装的理念,为企业级 Agent 应用提供从开发、部署到运维的全生命周期管理。

image

函数计算 AgentRun 架构图

AgentRun 运行时基于阿里云函数计算 FC 构建,继承了 Serverless 计算极致弹性、按量付费、零运维的核心优势。通过深度集成 AgentScope、LangChain、RAGFlow、Mem0 等主流开源生态。函数计算 AgentRun 将 Serverless 的极致弹性、零运维和按量付费的特性与 AI 原生应用场景深度融合,助力企业实现成本与效率的极致优化,平均 TCO 降低 60% 。 

让开发者只需专注于 Agent 的业务逻辑创新,无需关心底层基础设施,让 Agentic AI 真正进入企业生产环境。

推荐阅读:

最近接触到一款支持自动学生认证的程序,便萌生了为其补充完善功能的想法。开发过程中,我发现某平台可购得单价 3.5 元的邮箱账号,这类账号仅支持手机端登录,且未开启双因素认证(2FA)。针对这一情况,我为程序新增了三项核心功能:2FA 自动设置年龄验证校验(毕竟没有学生资格的账号,大概率是因为未完成 2FA 验证)、2FA 自动修改,以及虚拟卡自动绑定

整个项目全程基于 Claude 和 Codex 工具开发,目前已实现根据输入的虚拟卡信息,自动完成年龄验证与卡片绑定的功能。需要说明的是,我并非账号经销商,项目也未植入任何广告。此外,程序尚存不少待修复的 bug,且我后续不会对其进行维护迭代 —— 开发这个工具只是出于个人兴趣,我仅注册了二十个账号用于自身日常开发,完全不涉及任何盈利行为。

需要配置比特浏览器使用,会自动创建窗口,以及你的浏览器配置过密码删除窗口,也会自动删除窗口。

很多账号没有学生资格,以及你被拉入家庭组之后,反重力还是无法用,很大概率就是没有年龄验证,这个很重要。


📌 转载信息
原作者:
16627517673
转载时间:
2026/1/23 09:08:18

编者按: 我们今天为大家带来的这篇文章,作者的核心观点是:相较于依赖复杂且高成本的动态 MCP 工具加载机制,以 Skills 为核心的能力摘要与自维护模式,在当前阶段反而更加高效、稳定且可控。

文章系统梳理了延迟工具加载(deferred tool loading)的工程现实与限制,指出即便工具可以延后注入,对话级别的工具集合仍然是静态的,且发现机制高度依赖正则匹配,收益并不如预期。作者进一步深入分析了 MCP 在上下文占用、API 稳定性、缓存失效与推理轨迹丢失等方面带来的隐性成本,并结合 Sentry MCP、Playwright 等实践案例,说明为何将 MCP 转换为 Skills,反而能让 Agent 更好地发挥既有工具的能力。文章最后还探讨了 MCP 是否可能完全转化为 Skills 的可行性,并坦率指出当前协议与生态在稳定性与摘要机制上的不足。

作者 | Armin Ronacher

(作者为 Flask、Jinja2 等开源项目的创建者)

编译 | 岳扬

我正把所有的 MCP 都迁移到 Skills 上,包括之前还在使用的最后一个:Sentry MCP(译者注:Sentry 是流行的应用监控与错误追踪平台)。早前我就已经完全弃用 Playwright(译者注:由 Microsoft 开发的现代 Web 自动化测试和浏览器自动化框架),转向使用 Playwright Skill。

过去一个月左右,关于使用“动态工具配置(dynamic tool loadouts)[1]”来推迟工具定义的加载的讨论一直不少。Anthropic 也在探索通过代码来串联 MCP 调用的思路,这一点我也尝试过[2]。

我想分享一下自己在这方面的最新心得,以及为什么 Anthropic 提出的“延迟工具加载方案(deferred tool loading)”并未改变我对 MCP 的看法。或许这些内容对他人会有所帮助。

01 什么是工具(Tool)?

当 Agent 通过强化学习或其他方式接触到工具定义时,它会被鼓励在遇到适合使用该工具的场景时,通过特殊的 token 输出工具调用。实际上,工具定义只能出现在系统提示词(system prompt)中特定的工具定义 token 之间。从历史经验来看,这意味着我们无法在对话状态的中途动态发出新的工具定义。因此,唯一的现实选择是在对话开始时就将工具加载好。

在智能体应用场景中,我们当然可以随时压缩对话状态,或更改系统消息中的工具定义。但这样做的后果是,我们会丢失推理轨迹(reasoning traces)以及缓存(cache)。以 Anthropic 为例,这将大幅增加对话成本:基本上就是从头开始,相比于缓存读取,需要支付完整的 token 费用,外加缓存写入成本。

Anthropic 最近的一项创新是“延迟工具加载”(deferred tool loading)。我们仍然需要提前在系统提示词(system message)中声明工具,但这些工具不会在系统提示词发出时就注入到对话中,而是会稍后才出现。不过据我所知,这些工具定义在整个对话过程中仍必须是静态的 —— 也就是说,哪些工具可能存在,是在对话开始时就确定好的。 Anthropic 发现这些工具的方式,纯粹是通过正则表达式(regex)搜索实现的。

02 与 Skills 的对比

尽管带延迟加载的 MCP 感觉上应该表现更优,实际上却需要在 LLM API 端做不少工程化工作。而 Skills 系统完全不需要这些,至少从我的经验来看,其表现依然更胜一筹。

Skills 实质上只是对现有能力及其说明文件位置的简短摘要。这些信息会被主动加载到上下文中。 因此,智能体能在系统上下文里(或上下文的其他位置)知晓自己具备哪些能力,并获知如何使用这些能力的“手册链接”。

关键在于,Skills 并不会真正把工具定义加载到上下文中。 可用工具保持不变:bash 以及智能体已有的其他工具。Skills 所能提供的,只是如何更高效使用这些工具的技巧和方法。

由于 Skills 主要教的是如何使用其他命令行工具和类似实用程序,因此组合与协调这些工具的基本方式其实并未改变。让 Claude 系列模型成为优秀工具调用者的强化学习机制,恰好能帮助处理这些新发现的工具。

03 MCP 能否转换为 Skills?

这自然引出了一个问题:既然 Skills 效果这么好,我能不能把 MCP 完全移出上下文,转而像 Anthropic 提议的那样,通过 CLI 来调用它?答案是:可以,但效果并不好。Peter Steinberger 的 mcporter[3] 就是其中一种方案。简单来说,它会读取 .mcp.json 文件,并将背后的 MCP 暴露为可调用的工具:

npx mcporter call 'linear.create_comment(issueId: "ENG-123", body: "Looks good!")'

确实,它看起来非常像一个 LLM 可以调用的命令行工具。但问题在于,LLM 根本不知道有哪些工具可用 —— 现在你得专门教它。于是你可能会想:那为什么不创建一些 Skills,来教 LLM 了解这些 MCP 呢?对我而言,这里的问题在于:MCP 服务器根本没有维持 API 稳定性的意愿。它们越来越倾向于将工具定义精简到极致,只为节省 token。 这种做法有其道理,但对 Skills 模式来说却适得其反。举个例子,Sentry MCP 服务器曾彻底将查询语法切换为自然语言。这对 Agent 来说是一次重大改进,但我之前关于如何使用它的建议反而成了障碍,而且我没能第一时间发现问题。

这其实和 Anthropic 的“延迟工具加载方案”非常相似:上下文中完全没有任何关于该工具的信息,我们必须手动创建一份摘要。我们过去对 MCP 工具采用的预加载(eager loading)方式,如今陷入了一个尴尬的局面:描述既太长,不便预加载;又太短,无法真正教会 Agent 如何使用它们。 因此,至少从我的经验来看,你最终还是得为通过 mcporter 或类似方式暴露出来的 MCP 工具,手动维护这些 Skills 摘要。

04 最省事的路线

这让我得出了目前的结论:我倾向于选择最省事的方式,也就是让 Agent 自己以“Skills”的形式编写所需的工具。 这样做不仅耗时不多,最大的好处还在于工具基本处于我的掌控之中。每当它出问题或需要新增功能时,我就让 Agent 去调整它。Sentry MCP 就是个很好的例子 —— 我认为它可能是目前设计得最好的 MCP 之一,但我已经不再使用它了。一方面是因为一旦在上下文中立即加载它,就会直接消耗约 8k 个 token;另一方面,我也一直没能通过 mcporter 让它正常工作。现在我让 Claude 为我维护一个对应的 Skill。没错,这个 Skill 可能有不少 bug,也需要不断更新,但由于是 Agent 自己维护的,整体效果反而更好。

当然,这一切很可能在未来发生变化。但就目前而言,手动维护的 Skills,以及让 Agent 自行编写工具,已成为我的首选方式。我推测,基于 MCP 的动态工具加载终将成为主流,但要实现这一点,可能还需要一系列协议层面的改进,以便引入类似 Skills 的摘要机制,以及为工具内置使用手册。 我也认为,MCP 如果能具备更强的协议稳定性,将大有裨益。目前 MCP 服务器随意更改工具描述的做法,与那些已经固化下来的调用方式(materialized calls)以及在 README 和技能文件中编写的外部工具说明很难兼容。

END

本期互动内容 🍻

❓抛开现有方案,你理想中的AI工具调用范式应该长什么样?用一句话描述你最核心的需求。

文中链接

[1]https://www.anthropic.com/engineering/advanced-tool-use

[2]https://lucumr.pocoo.org/2025/7/3/tools/

[3]https://github.com/steipete/mcporter

原文链接:

https://lucumr.pocoo.org/2025/12/13/skills-vs-mcp/

作者:辰泉

前言

在 Agentic AI 时代,智能体需要与真实世界交互,而浏览器是连接虚拟世界与现实世界的重要桥梁。AgentRun Browser Sandbox 为智能体提供了安全、高性能、免运维的浏览器执行环境,让 AI Agent 真正具备“上网”的能力——从网页抓取、信息提取到表单填写、自动化操作,一切皆可实现。

AgentRun Browser Sandbox 介绍

什么是 Browser Sandbox?

Browser Sandbox 是 AgentRun 平台提供的云原生无头浏览器沙箱服务,基于阿里云函数计算(FC)构建。它为智能体提供了一个安全隔离的浏览器执行环境,支持通过标准的 Chrome DevTools Protocol (CDP) 远程控制浏览器实例。

核心特性

无头浏览器能力

  • 内置 Chromium/Chrome 浏览器,支持完整的 Web 标准
  • 原生兼容 Puppeteer、Playwright 等主流自动化框架
  • 支持通过 CDP 协议进行精细化控制

实时可视化

  • 内置 VNC 服务,支持实时查看浏览器界面
  • 提供操作录制功能,方便调试和回放
  • 支持通过 noVNC 客户端在网页中直接交互

安全与隔离

  • 每个沙箱实例运行在独立的容器环境中
  • 文件系统和进程空间完全隔离
  • 支持 WSS 加密传输,确保数据安全

Serverless 架构

  • 按需创建,按量付费,无需提前预置资源
  • 快速弹性伸缩,支持高并发场景
  • 零运维,无需管理服务器和浏览器依赖

主要应用场景

  • AI Agent 赋能: 为大模型提供“眼睛”和“手”,执行网页浏览、信息提取、在线操作等任务
  • 自动化测试: 在云端运行端到端(E2E)测试和视觉回归测试
  • 数据采集: 稳定、高效地进行网页抓取,应对动态加载和反爬虫挑战
  • 内容生成: 自动化生成网页截图或 PDF 文档

上手使用 AgentRun Browser Sandbox

AgentRun SDK 快速介绍

后续的内容将基于 AgentRun SDK 进行,因此我们先对 SDK 进行简要介绍。

Agentrun SDK 是一个开源的开发者工具包,本期介绍 Python 版本。其旨在简化智能体与 AgentRun 平台各种服务(包括 Browser Sandbox)的集成。它提供了统一的接口,让您可以用几行代码就将沙箱能力集成到现有的 Agent 框架中。SDK 的核心功能如下:

统一集成接口

  • 提供对 LangChain、AgentScope 等主流框架的开箱即用支持
  • 统一的模型代理接口,简化多模型管理
  • 标准化的工具注册机制

Sandbox 生命周期管理

  • 自动创建和销毁沙箱实例
  • 支持会话级别的状态保持
  • 灵活的资源配置和超时控制

安装 AgentRun SDK

pip install agentrun-sdk[playwright,server]

注意: 确保您的 Python 环境版本在 3.10 及以上。

基本使用示例

以下是使用 AgentRun SDK 创建和管理 Browser Sandbox 的核心代码:

from agentrun.sandbox import Sandbox, TemplateType
from playwright.sync_api import sync_playwright
# 创建 Browser Sandbox
sandbox = Sandbox.create(
    template_type=TemplateType.BROWSER,
    template_name="your-template-name",
    sandbox_idle_timeout_seconds=300
)
# 获取 CDP URL(用于 Playwright 连接)
cdp_url = sandbox.get_cdp_url()
# 使用 Playwright 连接并操作
with sync_playwright() as p:
    browser = p.chromium.connect_over_cdp(cdp_url)
    page = browser.contexts[0].pages[0]
    page.goto("https://www.example.com")
    page.screenshot(path="screenshot.png")
    browser.close()
# 销毁 Sandbox
sandbox.delete()

关键概念:

  • template_name: 控制台创建的浏览器环境模板
  • cdp_url: 用于 Playwright/Puppeteer 连接
  • vnc_url: 用于实时查看浏览器画面(可通过 sandbox.get_cdp_url() 获取)

注意: 由于所有浏览器操作都在云端进行,您无需在本地安装浏览器。Playwright 仅用于通过 CDP 协议连接到云端的浏览器实例。

如何创建 Sandbox 模板

使用 Browser Sandbox 需要新建 Sandbox 模板,您需要访问 AgentRun 控制台网站 [ 1] ,并按照如下步骤创建模板:

  1. 在顶部菜单栏选择“运行时与沙箱”;
  2. 在左侧边栏选择“Sandbox 沙箱”;
  3. 点击右上角“创建沙箱模板”;

image

  1. 选择“浏览器”;

image

  1. 在弹出的抽屉对话框中填写和选择您的模板的规格、网络等配置,并复制模板名称;

image

  1. 点击“创建浏览器”等待其就绪即可。

从零开始用 LangChain 创建 Browser Sandbox 智能体

本教程将指导您从零开始创建一个完整的 Browser Sandbox 智能体项目。

基于 LangChain 集成 Browser Sandbox

本教程将详细讲解如何使用 LangChain 创建 Browser Sandbox 相关的 Tools 并集成到 Agent 中。

项目结构

为了保持代码的内聚性和可维护性,我们将代码拆分为以下模块:

模块职责划分:

sandbox_manager.py:负责 Sandbox 的创建、管理和销毁,提供统一的接口
langchain_agent.py:负责创建 LangChain Tools 和 Agent,集成 VNC 信息
main.py:作为入口文件,演示如何使用上述模块

步骤 1:创建项目并安装依赖

首先创建项目目录(如果还没有):

mkdir -p langchain-demo
cd langchain-demo

创建 requirements.txt 文件,内容如下:

# LangChain 核心库
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
# AgentRun SDK
agentrun-sdk[playwright,server]>=0.0.8
# 浏览器自动化
playwright>=1.40.0
# 环境变量管理
python-dotenv>=1.0.0

然后安装依赖:

pip install -r requirements.txt

主要依赖说明:

  • langchain 和 langchain-openai:LangChain 核心库
  • agentrun-sdk[playwright,server]:AgentRun SDK,用于 Sandbox 管理
  • playwright:浏览器自动化库
  • python-dotenv:环境变量管理

步骤 2:配置环境变量

在项目根目录创建 .env 文件,配置以下环境变量:

# 阿里云百炼平台的 API Key,用于调用大模型能力
# 请前往 https://bailian.console.aliyun.com/?tab=app#/api-key 创建和查看
DASHSCOPE_API_KEY=sk-your-bailian-api-key
# 阿里云账号的访问密钥 ID 和访问密钥 Secret,用于 AgentRun SDK 鉴权
ALIBABA_CLOUD_ACCESS_KEY_ID=your-ak
ALIBABA_CLOUD_ACCESS_KEY_SECRET=your-sk
ALIBABA_CLOUD_ACCOUNT_ID=your-main-account-id
ALIBABA_CLOUD_REGION=cn-hangzhou
# browser sandbox 模板的名称,可以在 https://functionai.console.aliyun.com/cn-hangzhou/agent/runtime/sandbox 控制台创建
BROWSER_TEMPLATE_NAME=sandbox-your-template-name
# agentrun 的控制面和数据面的 API 端点请求地址,默认cn-hangzhou
AGENTRUN_CONTROL_ENDPOINT=agentrun.cn-hangzhou.aliyuncs.com
AGENTRUN_DATA_ENDPOINT=https://${your-main-account-id}.agentrun-data.cn-hangzhou.aliyuncs.com

步骤 3:创建 Sandbox 生命周期管理模块

创建 sandbox_manager.py 文件,负责 Sandbox 的创建、管理和销毁。核心代码如下:

"""
Sandbox 生命周期管理模块
负责 AgentRun Browser Sandbox 的创建、管理和销毁。
提供统一的接口供 LangChain Agent 使用。
"""
import os
from typing import Optional, Dict, Any
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
class SandboxManager:
    """Sandbox 生命周期管理器"""
    def __init__(self):
        self._sandbox: Optional[Any] = None
        self._sandbox_id: Optional[str] = None
        self._cdp_url: Optional[str] = None
        self._vnc_url: Optional[str] = None
    def create(
        self,
        template_name: Optional[str] = None,
        idle_timeout: int = 3000
    ) -> Dict[str, Any]:
        """
        创建或获取一个浏览器 sandbox 实例
        Args:
            template_name: Sandbox 模板名称,如果为 None 则从环境变量读取
            idle_timeout: 空闲超时时间(秒),默认 3000 秒
        Returns:
            dict: 包含 sandbox_id, cdp_url, vnc_url 的字典
        Raises:
            RuntimeError: 创建失败时抛出异常
        """
        try:
            from agentrun.sandbox import Sandbox, TemplateType
            # 如果已有 sandbox,直接返回
            if self._sandbox is not None:
                return self.get_info()
            # 从环境变量获取模板名称
            if template_name is None:
                template_name = os.getenv(
                    "BROWSER_TEMPLATE_NAME",
                    "sandbox-browser-demo"
                )
            # 创建 sandbox
            self._sandbox = Sandbox.create(
                template_type=TemplateType.BROWSER,
                template_name=template_name,
                sandbox_idle_timeout_seconds=idle_timeout
            )
            self._sandbox_id = self._sandbox.sandbox_id
            self._cdp_url = self._get_cdp_url()
            self._vnc_url = self._get_vnc_url()
            return self.get_info()
        except ImportError as e:
            print(e)
            raise RuntimeError(
                "agentrun-sdk 未安装,请运行: pip install agentrun-sdk[playwright,server]"
            )
        except Exception as e:
            raise RuntimeError(f"创建 Sandbox 失败: {str(e)}")
    def get_info(self) -> Dict[str, Any]:
        """
        获取当前 sandbox 的信息
        Returns:
            dict: 包含 sandbox_id, cdp_url, vnc_url 的字典
        Raises:
            RuntimeError: 如果没有活动的 sandbox
        """
        if self._sandbox is None:
            raise RuntimeError("没有活动的 sandbox,请先创建")
        return {
            "sandbox_id": self._sandbox_id,
            "cdp_url": self._cdp_url,
            "vnc_url": self._vnc_url,
        }
    def get_cdp_url(self) -> Optional[str]:
        """获取 CDP URL"""
        return self._sandbox.get_cdp_url()
    def get_vnc_url(self) -> Optional[str]:
        """获取 VNC URL"""
        return self._sandbox.get_vnc_url()
    def get_sandbox_id(self) -> Optional[str]:
        """获取 Sandbox ID"""
        return self._sandbox_id
    def destroy(self) -> str:
        """
        销毁当前的 sandbox 实例
        Returns:
            str: 操作结果描述
        """
        if self._sandbox is None:
            return "没有活动的 sandbox"
        try:
            sandbox_id = self._sandbox_id
            # 尝试销毁 sandbox
            if hasattr(self._sandbox, 'delete'):
                self._sandbox.delete()
            elif hasattr(self._sandbox, 'stop'):
                self._sandbox.stop()
            elif hasattr(self._sandbox, 'destroy'):
                self._sandbox.destroy()
            # 清理状态
            self._sandbox = None
            self._sandbox_id = None
            self._cdp_url = None
            self._vnc_url = None
            return f"Sandbox 已销毁: {sandbox_id}"
        except Exception as e:
            # 即使销毁失败,也清理本地状态
            self._sandbox = None
            self._sandbox_id = None
            self._cdp_url = None
            self._vnc_url = None
            return f"销毁 Sandbox 时出错: {str(e)}"
    def is_active(self) -> bool:
        """检查 sandbox 是否活跃"""
        return self._sandbox is not None
    def __enter__(self):
        """上下文管理器入口"""
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器退出,自动销毁"""
        self.destroy()
        return False
# 全局单例(可选,用于简单场景)
_global_manager: Optional[SandboxManager] = None
def get_global_manager() -> SandboxManager:
    """获取全局 SandboxManager 单例"""
    global _global_manager
    if _global_manager is None:
        _global_manager = SandboxManager()
    return _global_manager
def reset_global_manager():
    """重置全局 SandboxManager"""
    global _global_manager
    if _global_manager:
        _global_manager.destroy()
    _global_manager = None

关键功能:

  1. 创建 Sandbox: 使用 AgentRun SDK 创建浏览器 Sandbox
  2. 获取连接信息: 自动获取 CDP URL 和 VNC URL,支持多种属性名兼容
  3. 生命周期管理: 提供销毁方法,确保资源正确释放

步骤 4:创建 LangChain Tools 和 Agent

创建 langchain_agent.py 文件,定义 LangChain Tools 并创建 Agent。核心代码如下:

"""
LangChain Agent 和 Tools 注册模块
负责创建 LangChain Agent,注册 Sandbox 相关的 tools,并集成 VNC 可视化。
本模块使用 sandbox_manager.py 中封装的 SandboxManager 来管理 sandbox 生命周期。
"""
import os
from dotenv import load_dotenv
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from pydantic import BaseModel, Field
# 导入 sandbox 管理器
from sandbox_manager import SandboxManager
# 加载环境变量
load_dotenv()
# 全局 sandbox 管理器实例(单例模式)
_sandbox_manager: SandboxManager | None = None
def get_sandbox_manager() -> SandboxManager:
    """获取 sandbox 管理器实例(单例模式)"""
    global _sandbox_manager
    if _sandbox_manager is None:
        _sandbox_manager = SandboxManager()
    return _sandbox_manager
# ============ LangChain Tools 定义 ============
@tool
def create_browser_sandbox(
    template_name: str = None,
    idle_timeout: int = 3000
) -> str:
    """创建或获取一个浏览器 sandbox 实例。
    当需要访问网页、执行浏览器操作时,首先需要创建 sandbox。
    创建成功后,会返回 sandbox 信息,包括 VNC URL 用于可视化。
    Args:
        template_name: Sandbox 模板名称,如果不提供则从环境变量 BROWSER_TEMPLATE_NAME 读取
        idle_timeout: 空闲超时时间(秒),默认 3000 秒
    Returns:
        Sandbox 信息字符串,包括 ID、CDP URL、VNC URL
    """
    try:
        manager = get_sandbox_manager()
        # 如果 template_name 为空字符串,转换为 None 以便从环境变量读取
        if template_name == "":
            template_name = None
        info = manager.create(template_name=template_name, idle_timeout=idle_timeout)
        result = f"""✅ Sandbox 创建成功!
📋 Sandbox 信息:
- ID: {info['sandbox_id']}
- CDP URL: {info['cdp_url']}
"""
        vnc_url = info.get('vnc_url')
        if vnc_url:
            result += f"- VNC URL: {vnc_url}\n\n"
            result += "提示: VNC 查看器应该已自动打开,您可以在浏览器中实时查看浏览器操作。"
        else:
            result += "\n警告: 未获取到 VNC URL,可能无法使用可视化功能。"
        return result
    except Exception as e:
        return f" 创建 Sandbox 失败: {str(e)}"
@tool
def get_sandbox_info() -> str:
    """获取当前 sandbox 的详细信息,包括 ID、CDP URL、VNC URL 等。
    当需要查看当前 sandbox 状态或获取 VNC 连接信息时使用此工具。
    Returns:
        Sandbox 信息字符串
    """
    try:
        manager = get_sandbox_manager()
        info = manager.get_info()
        result = f"""📋 当前 Sandbox 信息:
- Sandbox ID: {info['sandbox_id']}
- CDP URL: {info['cdp_url']}
"""
        if info.get('vnc_url'):
            result += f"- VNC URL: {info['vnc_url']}\n\n"
            result += "您可以使用 VNC URL 在浏览器中实时查看操作过程。\n"
            result += "   推荐使用 vnc.html 文件或 noVNC 客户端。"
        return result
    except RuntimeError as e:
        return f" {str(e)}"
    except Exception as e:
        return f" 获取 Sandbox 信息失败: {str(e)}"
class NavigateInput(BaseModel):
    """浏览器导航输入参数"""
    url: str = Field(description="要访问的网页 URL,必须以 http:// 或 https:// 开头")
    wait_until: str = Field(
        default="load",
        description="等待页面加载的状态: load, domcontentloaded, networkidle"
    )
    timeout: int = Field(
        default=30000,
        description="超时时间(毫秒),默认 30000"
    )
@tool(args_schema=NavigateInput)
def navigate_to_url(url: str, wait_until: str = "load", timeout: int = 30000) -> str:
    """使用 sandbox 中的浏览器导航到指定 URL。
    当用户需要访问网页时使用此工具。导航后可以在 VNC 中实时查看页面。
    Args:
        url: 要访问的网页 URL
        wait_until: 等待页面加载的状态(load/domcontentloaded/networkidle)
        timeout: 超时时间(毫秒)
    Returns:
        导航结果描述
    """
    try:
        manager = get_sandbox_manager()
        if not manager.is_active():
            return " 错误: 请先创建 sandbox"
        # 验证 URL
        if not url.startswith(("http://", "https://")):
            return f" 错误: 无效的 URL 格式: {url}"
        cdp_url = manager.get_cdp_url()
        if not cdp_url:
            return " 错误: 无法获取 CDP URL"
        # 使用 Playwright 连接浏览器并导航
        try:
            from playwright.sync_api import sync_playwright
            with sync_playwright() as p:
                browser = p.chromium.connect_over_cdp(cdp_url)
                pages = browser.contexts[0].pages if browser.contexts else []
                if pages:
                    page = pages[0]
                else:
                    page = browser.new_page()
                page.goto(url, wait_until=wait_until, timeout=timeout)
                title = page.title()
                return f"已成功导航到: {url}\n📄 页面标题: {title}\n💡 您可以在 VNC 中查看页面内容。"
        except ImportError:
            return f"导航指令已发送: {url}\n💡 提示: 安装 playwright 以启用实际导航功能 (pip install playwright)"
        except Exception as e:
            return f" 导航失败: {str(e)}"
    except Exception as e:
        return f" 操作失败: {str(e)}"
@tool("browser_screenshot", description="在浏览器 sandbox 中截取当前页面截图")
def take_screenshot(filename: str = "screenshot.png") -> str:
    """截取浏览器当前页面的截图。
    Args:
        filename: 截图文件名,默认 "screenshot.png"
    Returns:
        操作结果
    """
    try:
        manager = get_sandbox_manager()
        if not manager.is_active():
            return " 错误: 请先创建 sandbox"
        cdp_url = manager.get_cdp_url()
        if not cdp_url:
            return " 错误: 无法获取 CDP URL"
        try:
            from playwright.sync_api import sync_playwright
            with sync_playwright() as p:
                browser = p.chromium.connect_over_cdp(cdp_url)
                pages = browser.contexts[0].pages if browser.contexts else []
                if pages:
                    page = pages[0]
                else:
                    return " 错误: 没有打开的页面"
                page.screenshot(path=filename)
                return f"截图已保存: {filename}"
        except ImportError:
            return " 错误: 需要安装 playwright (pip install playwright)"
        except Exception as e:
            return f" 截图失败: {str(e)}"
    except Exception as e:
        return f" 操作失败: {str(e)}"
@tool("destroy_sandbox", description="销毁当前的 sandbox 实例,释放资源。注意:仅在程序退出或明确需要释放资源时使用,不要在一轮对话后销毁。")
def destroy_sandbox() -> str:
    """销毁当前的 sandbox 实例。
    重要提示:此工具应该仅在以下情况使用:
    - 程序即将退出
    - 明确需要释放资源
    - 用户明确要求销毁
    不要在一轮对话完成后就销毁 sandbox,因为 sandbox 可以在多轮对话中复用。
    Returns:
        操作结果
    """
    try:
        manager = get_sandbox_manager()
        result = manager.destroy()
        return result
    except Exception as e:
        return f" 销毁失败: {str(e)}"
# ============ Agent 创建 ============
def create_browser_agent(system_prompt: str = None):
    """
    创建带有 sandbox 工具的 LangChain Agent
    Args:
        system_prompt: 自定义系统提示词,如果为 None 则使用默认提示词
    Returns:
        LangChain Agent 实例
    """
    # 配置 DashScope API
    api_key = os.getenv("DASHSCOPE_API_KEY")
    if not api_key:
        raise ValueError("请设置环境变量 DASHSCOPE_API_KEY")
    base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    model_name = os.getenv("QWEN_MODEL", "qwen-plus")
    # 创建 LLM
    model = ChatOpenAI(
        model=model_name,
        api_key=api_key,
        base_url=base_url,
        temperature=0.7,
    )
    # 创建工具列表
    tools = [
        create_browser_sandbox,
        get_sandbox_info,
        navigate_to_url,
        take_screenshot,
        destroy_sandbox,
    ]
    # 默认系统提示词
    if system_prompt is None:
        system_prompt = """你是一个浏览器自动化助手,可以使用 sandbox 来访问和操作网页。
当用户需要访问网页时,请按以下步骤操作:
1. 首先创建或获取 sandbox(如果还没有)
2. 使用 navigate_to_url 导航到目标网页
3. 执行用户请求的操作
4. 如果需要,可以截取截图
重要提示:
- 创建 sandbox 后,会返回 VNC URL,用户可以使用它实时查看浏览器操作
- 所有操作都会在 VNC 中实时显示,方便调试和监控
- sandbox 可以在多轮对话中复用,不要在一轮对话完成后就销毁
- 只有在用户明确要求销毁时才使用 destroy_sandbox 工具
- 不要主动建议用户销毁 sandbox,除非用户明确要求
- 请始终用中文回复,确保操作准确、高效。"""
    # 创建 Agent
    agent = create_agent(
        model=model,
        tools=tools,
        system_prompt=system_prompt,
    )
    return agent
def get_available_tools():
    """获取所有可用的工具列表"""
    return [
        create_browser_sandbox,
        get_sandbox_info,
        navigate_to_url,
        take_screenshot,
        destroy_sandbox,
    ]

关键要点:

  1. Tool 定义: 使用 @tool 装饰器定义 LangChain Tools
  2. 类型提示: 所有参数必须有类型提示,用于生成工具 schema
  3. 文档字符串: 详细的文档字符串帮助 LLM 理解何时使用工具**
  4. 单例模式: 使用全局管理器实例确保 Sandbox 在会话中复用**

步骤 5:创建主入口文件

创建 main.py 文件,作为程序入口。核心代码如下:

"""
LangChain + AgentRun Browser Sandbox 集成示例
主入口文件,演示如何使用 LangChain Agent 与 AgentRun Browser Sandbox 集成。
"""
import os
import sys
import signal
import webbrowser
import urllib.parse
import threading
import http.server
import socketserver
from pathlib import Path
from dotenv import load_dotenv
from langchain_agent import create_browser_agent, get_sandbox_manager
# 加载环境变量
load_dotenv()
# 全局 HTTP 服务器实例
_http_server = None
_http_port = 8080
# 全局清理标志,用于防止重复清理
_cleanup_done = False
def start_http_server():
    """启动一个简单的 HTTP 服务器来提供 vnc.html"""
    global _http_server
    if _http_server is not None:
        return _http_port
    try:
        current_dir = Path(__file__).parent.absolute()
        class VNCRequestHandler(http.server.SimpleHTTPRequestHandler):
            def __init__(self, *args, **kwargs):
                super().__init__(*args, directory=str(current_dir), **kwargs)
            def log_message(self, format, *args):
                # 静默日志,避免输出过多信息
                pass
        # 尝试启动服务器
        for port in range(_http_port, _http_port + 10):
            try:
                server = socketserver.TCPServer(("", port), VNCRequestHandler)
                server.allow_reuse_address = True
                # 在后台线程中运行服务器
                def run_server():
                    server.serve_forever()
                thread = threading.Thread(target=run_server, daemon=True)
                thread.start()
                _http_server = server
                return port
            except OSError:
                continue
        return None
    except Exception as e:
        print(f"启动 HTTP 服务器失败: {str(e)}")
        return None
def open_vnc_viewer(vnc_url: str):
    """
    自动打开 VNC 查看器并设置 VNC URL
    Args:
        vnc_url: VNC WebSocket URL
    """
    if not vnc_url:
        return
    try:
        # 获取当前文件所在目录
        current_dir = Path(__file__).parent.absolute()
        vnc_html_path = current_dir / "vnc.html"
        # 检查文件是否存在
        if not vnc_html_path.exists():
            print(f"警告: vnc.html 文件不存在: {vnc_html_path}")
            print_vnc_info(vnc_url)
            return
        # 启动 HTTP 服务器
        port = start_http_server()
        if port:
            # 编码 VNC URL 作为 URL 参数
            encoded_url = urllib.parse.quote(vnc_url, safe='')
            # 构建 HTTP URL
            http_url = f"http://localhost:{port}/vnc.html?url={encoded_url}"
            # 打开浏览器
            print(f"\n正在打开 VNC 查看器...")
            print(f"HTTP 服务器运行在: http://localhost:{port}")
            print(f"VNC URL: {vnc_url[:80]}...")
            print(f"完整 URL: {http_url[:100]}...")
            webbrowser.open(http_url)
            print(f"VNC 查看器已打开")
            print(f"VNC URL 已通过 URL 参数自动设置,页面加载后会自动连接")
        else:
            # 如果 HTTP 服务器启动失败,尝试使用 file:// 协议
            print(f"HTTP 服务器启动失败,尝试使用文件协议...")
            encoded_url = urllib.parse.quote(vnc_url, safe='')
            file_url = f"file://{vnc_html_path}?url={encoded_url}"
            webbrowser.open(file_url)
            print(f"VNC 查看器已打开(使用文件协议)")
            print(f"提示: 如果无法自动连接,请手动复制 VNC URL 到输入框")
    except Exception as e:
        print(f"自动打开 VNC 查看器失败: {str(e)}")
        print_vnc_info(vnc_url)
def print_vnc_info(vnc_url: str):
    """打印 VNC 连接信息"""
    if not vnc_url:
        return
    print("\n" + "=" * 60)
    print("VNC 可视化连接信息")
    print("=" * 60)
    print(f"\nVNC URL: {vnc_url}")
    print("\n使用方式:")
    print("   1. 使用 noVNC 客户端连接")
    print("   2. 或在浏览器中访问 VNC 查看器页面")
    print("   3. 实时查看浏览器操作过程")
    print("\n" + "=" * 60 + "\n")
def cleanup_sandbox():
    """
    清理 sandbox 资源
    这个函数可以被信号处理器、异常处理器和正常退出流程调用
    """
    global _cleanup_done
    # 防止重复清理
    if _cleanup_done:
        return
    _cleanup_done = True
    try:
        manager = get_sandbox_manager()
        if manager.is_active():
            print("\n" + "=" * 60)
            print("正在清理 sandbox...")
            print("=" * 60)
            result = manager.destroy()
            print(f"清理结果: {result}\n")
        else:
            print("\n没有活动的 sandbox 需要清理\n")
    except Exception as e:
        print(f"\n清理 sandbox 时出错: {str(e)}\n")
def signal_handler(signum, frame):
    """
    信号处理器,处理 Ctrl+C (SIGINT) 和其他信号
    Args:
        signum: 信号编号
        frame: 当前堆栈帧
    """
    print("\n\n收到中断信号,正在清理资源...")
    cleanup_sandbox()
    print("清理完成")
    sys.exit(0)
def main():
    """主函数"""
    global _cleanup_done
    # 重置清理标志
    _cleanup_done = False
    # 注册信号处理器,处理 Ctrl+C (SIGINT)
    signal.signal(signal.SIGINT, signal_handler)
    # 在 Windows 上,SIGBREAK 也可以处理
    if hasattr(signal, 'SIGBREAK'):
        signal.signal(signal.SIGBREAK, signal_handler)
    print("=" * 60)
    print("LangChain + AgentRun Browser Sandbox 集成示例")
    print("=" * 60)
    print()
    try:
        # 创建 Agent
        print("正在初始化 LangChain Agent...")
        agent = create_browser_agent()
        print("Agent 初始化完成\n")
        # 示例查询
        queries = [
            "创建一个浏览器 sandbox",
            "获取当前 sandbox 的信息,包括 VNC URL",
            "导航到 https://www.aliyun.com",
            "截取当前页面截图",
        ]
        # 执行查询
        for i, query in enumerate(queries, 1):
            print(f"\n{'=' * 60}")
            print(f"查询 {i}: {query}")
            print(f"{'=' * 60}\n")
            try:
                result = agent.invoke({
                    "messages": [{"role": "user", "content": query}]
                })
                # 提取最后一条消息的内容
                output = result.get("messages", [])[-1].content if isinstance(result.get("messages"), list) else result.get("output", str(result))
                print(f"\n结果:\n{output}\n")
                # 如果是创建 sandbox,自动打开 VNC 查看器
                if i == 1:
                    try:
                        # 等待一下确保 sandbox 完全创建
                        import time
                        time.sleep(1)
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            vnc_url = info.get('vnc_url')
                            if vnc_url:
                                print(f"\n检测到 VNC URL: {vnc_url[:80]}...")
                                open_vnc_viewer(vnc_url)
                                print_vnc_info(vnc_url)
                            else:
                                print("\n警告: 未获取到 VNC URL,请检查 sandbox 创建是否成功")
                    except Exception as e:
                        print(f"打开 VNC 查看器时出错: {str(e)}")
                        import traceback
                        traceback.print_exc()
                # 如果是获取信息,显示 VNC 信息
                elif i == 2:
                    try:
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            if info.get('vnc_url'):
                                print_vnc_info(info['vnc_url'])
                    except:
                        pass
            except Exception as e:
                print(f"查询失败: {str(e)}\n")
                import traceback
                traceback.print_exc()
        # 交互式查询
        print("\n" + "=" * 60)
        print("进入交互模式(输入 'quit' 或 'exit' 退出,Ctrl+C 或 Ctrl+D 中断)")
        print("=" * 60 + "\n")
        while True:
            try:
                user_input = input("请输入您的查询: ").strip()
            except EOFError:
                # 处理 Ctrl+D (EOF)
                print("\n\n检测到输入结束 (Ctrl+D),正在清理资源...")
                cleanup_sandbox()
                print("清理完成")
                break
            except KeyboardInterrupt:
                # 处理 Ctrl+C (在 input 调用期间)
                print("\n\n检测到中断信号 (Ctrl+C),正在清理资源...")
                cleanup_sandbox()
                print("清理完成")
                break
            if not user_input:
                continue
            if user_input.lower() in ['quit', 'exit', '退出']:
                print("\nBye")
                # 退出前清理 sandbox
                cleanup_sandbox()
                break
            try:
                result = agent.invoke({
                    "messages": [{"role": "user", "content": user_input}]
                })
                output = result.get("messages", [])[-1].content if isinstance(result.get("messages"), list) else result.get("output", str(result))
                print(f"\n结果:\n{output}\n")
                # 检查是否需要打开或显示 VNC 信息
                user_input_lower = user_input.lower()
                if "创建" in user_input_lower and "sandbox" in user_input_lower:
                    # 如果是创建 sandbox,自动打开 VNC 查看器
                    try:
                        # 等待一下确保 sandbox 完全创建
                        import time
                        time.sleep(1)
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            vnc_url = info.get('vnc_url')
                            if vnc_url:
                                print(f"\n检测到 VNC URL: {vnc_url[:80]}...")
                                open_vnc_viewer(vnc_url)
                                print_vnc_info(vnc_url)
                            else:
                                print("\n警告: 未获取到 VNC URL,请检查 sandbox 创建是否成功")
                    except Exception as e:
                        print(f"打开 VNC 查看器时出错: {str(e)}")
                        import traceback
                        traceback.print_exc()
                elif "sandbox" in user_input_lower or "vnc" in user_input_lower:
                    # 其他情况只显示信息
                    try:
                        manager = get_sandbox_manager()
                        if manager.is_active():
                            info = manager.get_info()
                            if info.get('vnc_url'):
                                print_vnc_info(info['vnc_url'])
                    except:
                        pass
            except Exception as e:
                print(f"查询失败: {str(e)}\n")
                import traceback
                traceback.print_exc()
        # 清理资源(仅在程序正常退出时)
        cleanup_sandbox()
    except KeyboardInterrupt:
        # 处理顶层 KeyboardInterrupt (Ctrl+C)
        print("\n\n检测到中断信号 (Ctrl+C),正在清理资源...")
        cleanup_sandbox()
        print("清理完成")
        sys.exit(0)
    except EOFError:
        # 处理顶层 EOFError (Ctrl+D)
        print("\n\n检测到输入结束 (Ctrl+D),正在清理资源...")
        cleanup_sandbox()
        print("清理完成")
        sys.exit(0)
    except ValueError as e:
        print(f"配置错误: {str(e)}")
        print("\n提示: 请确保已设置以下环境变量:")
        print("   - DASHSCOPE_API_KEY: DashScope API Key")
        print("   - ALIBABA_CLOUD_ACCOUNT_ID: 阿里云账号 ID")
        print("   - ALIBABA_CLOUD_ACCESS_KEY_ID: 访问密钥 ID")
        print("   - ALIBABA_CLOUD_ACCESS_KEY_SECRET: 访问密钥 Secret")
        print("   - ALIBABA_CLOUD_REGION: 区域(默认: cn-hangzhou)")
    except Exception as e:
        print(f"发生错误: {str(e)}")
        import traceback
        traceback.print_exc()
        # 发生错误时也尝试清理
        cleanup_sandbox()
if __name__ == "__main__":
    main()

关键功能:

  1. VNC 自动打开: 创建 Sandbox 后自动打开 VNC 查看器
  2. 信号处理: 捕获 Ctrl+C,确保资源正确清理
  3. 交互模式: 支持持续对话,复用 Sandbox 实例

VNC 可视化集成

VNC(Virtual Network Computing)功能允许您实时查看和监控浏览器在 Sandbox 中的操作过程,这对于调试和监控 Agent 行为非常有用。

获取 VNC URL:

创建 Sandbox 后,可以通过 get_sandbox_info tool 获取 VNC URL:

# 通过 Agent 调用
result = agent.invoke({
    "messages": [{"role": "user", "content": "获取 sandbox 信息"}]
})
# 或直接通过管理器获取
manager = get_sandbox_manager()
info = manager.get_info()
vnc_url = info['vnc_url']

自动打开 VNC 查看器:

在 main.py 中,我们实现了自动打开 VNC 查看器的功能:

import webbrowser
import urllib.parse
from pathlib import Path
def open_vnc_viewer(vnc_url: str):
    """自动打开 VNC 查看器"""
    current_dir = Path(__file__).parent.absolute()
    vnc_html_path = current_dir / "vnc.html"
    if vnc_html_path.exists():
        # 通过 URL 参数传递 VNC URL
        encoded_url = urllib.parse.quote(vnc_url, safe='')
        file_url = f"file://{vnc_html_path}?url={encoded_url}"
        webbrowser.open(file_url)

VNC HTML 页面:

vnc.html 页面会从 URL 参数中读取 VNC URL,并自动连接到 VNC 服务器。页面包含以下核心功能:

  1. noVNC 库加载: 从 CDN 动态加载 noVNC 客户端库
  2. 自动连接: 读取 URL 参数中的 VNC URL 并自动连接
  3. 状态显示: 显示连接状态(连接中、已连接、已断开)
  4. 手动控制: 支持手动输入 VNC URL、断开重连等操作

核心 JavaScript 代码片段:

// 从 URL 参数获取 VNC URL
const urlParams = new URLSearchParams(window.location.search);
const vncUrl = urlParams.get('url');
// 加载 noVNC 库
async function loadNoVNC() {
    const module = await import('https://cdn.jsdelivr.net/gh/novnc/noVNC@v1.4.0/core/rfb.js');
    return module.default;
}
// 连接 VNC
async function connectVNC(url) {
    const RFB = await loadNoVNC();
    rfb = new RFB(vncScreen, url, {
        shared: true,
        credentials: { password: '' }
    });
    rfb.addEventListener('connect', () => {
        console.log('VNC 连接成功');
    });
}

完整的 vnc.html 文件可以在示例代码仓库中获取。

手动使用 VNC 查看器:

如果自动打开失败,您也可以手动使用 VNC 查看器:

1. 使用 noVNC 在线客户端:

  • 访问 noVNC 在线客户端 [ 2]
  • 在连接设置中填入 VNC URL
  • 点击连接

2. 使用本地 VNC HTML 页面:

  • 打开 vnc.html
  • 输入 VNC URL
  • 点击连接按钮

实时监控功能:

  • 所有浏览器操作都会在 VNC 中实时显示
  • 可以看到 Agent 的每一步操作(导航、点击、输入等)
  • 方便调试和监控 Agent 行为
  • 支持交互式操作(在 VNC 中直接操作浏览器)

运行和测试

python main.py

程序会自动:

  1. 创建 Browser Sandbox
  2. 打开 VNC 查看器(实时查看浏览器操作)
  3. 执行预设查询
  4. 进入交互模式

工作原理

为了更好地理解系统架构,我们将工作流程拆分为两个部分:LangChain Agent 工作流程和 SandboxManager 生命周期管理

1. LangChain Agent 工作流程

下图展示了 LangChain Agent 如何处理用户请求并调用相应的 Tools:

image

Agent 工作流程说明:

  1. 请求接收: 用户发起自然语言请求(如“访问淘宝首页并截图”)
  2. 意图分析: Agent 分析用户意图,决定需要调用哪些 Tools
  3. Tool 调用: 根据任务需求,顺序或组合调用多个 Tools
  4. Manager 交互: 所有 Tools 都通过 SandboxManager 单例实例操作 Sandbox
  5. 结果处理: Agent 将 Tool 返回的结果整合成用户友好的响应
  6. 多轮对话: Sandbox 在整个会话中保持活跃,支持多轮对话

5 个核心 Tools 的职责:

image

2. SandboxManager 生命周期管理

下图展示了 SandboxManager 如何管理 Sandbox 的完整生命周期:

image

SandboxManager 工作流程说明:

1. 单例管理:

  • 首次调用时创建 Manager 实例
  • 后续调用复用同一个实例
  • 确保整个会话只有一个 Sandbox

2. Sandbox 创建:

  • 调用 AgentRun SDK 的 Sandbox.create()
  • SDK 通过阿里云 API 与函数计算 FC 通信
  • FC 服务创建独立的容器实例,包含:
  • Chromium 浏览器 VNC 服务必要的运行环境

3. 连接信息获取:

  • CDP URL: WebSocket 地址,用于 Playwright/Puppeteer 远程控制浏览器
  • VNC URL: WebSocket 地址,用于实时查看浏览器画面**

4. 浏览器操作:

  • Playwright 通过 CDP URL 连接到远程浏览器
  • 执行各种浏览器操作(导航、点击、截图等)
  • VNC 同步显示操作过程,用户可实时监控

5. 资源清理:

  • 调用 destroy() 方法销毁 Sandbox
  • 清理 Manager 内部状态
  • 通过 SDK 释放云端资源

3. Agent 与 Manager 的协作关系

交互模式:

用户请求 → Agent → Tool → SandboxManager → AgentRun SDK → 云端 Sandbox
                                    ↓
用户响应 ← Agent ← Tool ← SandboxManager ← 操作结果

关键设计理念:

  1. 分层架构:
  • 用户层:自然语言交互
  • Agent 层:意图理解和任务分解
  • Tool 层:功能封装和参数验证
  • Manager 层:资源管理和状态维护
  • SDK 层:云服务通信
  • 云端层:实际的 Sandbox 环境
  1. 单例模式:
  • SandboxManager 使用单例模式
  • 保证整个会话中只有一个 Sandbox 实例
  • 避免资源浪费和状态冲突
  1. 状态复用:
  • Sandbox 在多轮对话中保持活跃
  • 减少创建和销毁的开销
  • 提供更流畅的用户体验
  1. 双通道设计:
  • CDP 通道:Agent 通过 Playwright 控制浏览器
  • VNC 通道:用户通过 VNC 查看器实时监控
  1. 解耦设计:
  • Tools 不直接操作 SDK,通过 Manager 统一管理
  • 便于扩展和维护
  • 统一的错误处理和资源管理

典型使用场景示例:

# 第 1 轮对话
用户: "创建一个 sandbox 并访问淘宝首页"
→ Agent 调用: create_browser_sandbox → navigate_to_url
→ Manager: 创建 Sandbox → Playwright 导航
→ 结果: "Sandbox 已创建,已访问淘宝首页"
# 第 2 轮对话(复用 Sandbox)
用户: "截取当前页面"
→ Agent 调用: take_screenshot
→ Manager: 使用现有 Sandbox → Playwright 截图
→ 结果: "截图已保存"
# 第 3 轮对话(复用 Sandbox)
用户: "访问京东首页"
→ Agent 调用: navigate_to_url
→ Manager: 使用现有 Sandbox → Playwright 导航
→ 结果: "已访问京东首页"

通过这种设计,Agent 专注于理解用户意图和任务编排,而 Manager 专注于 Sandbox 的生命周期管理,实现了清晰的职责分离。

工作原理总结:

  1. 工具注册:使用 @tool 装饰器将 Sandbox 功能封装为 LangChain Tools
  2. 生命周期管理: SandboxManager 负责 Sandbox 的创建、管理和销毁
  3. 状态保持:使用单例模式管理 Sandbox 实例,确保同一会话内复用
  4. VNC 集成:自动获取并返回 VNC URL,方便用户实时查看
  5. 错误处理:所有工具都包含完善的错误处理机制

扩展和定制

添加自定义 Tools:

@tool
def extract_table_data(url: str) -> str:
    """从网页中提取表格数据"""
    from playwright.sync_api import sync_playwright
    manager = get_sandbox_manager()
    cdp_url = manager.get_info()['cdp_url']
    with sync_playwright() as p:
        browser = p.chromium.connect_over_cdp(cdp_url)
        page = browser.contexts[0].pages[0]
        page.goto(url)
        tables = page.query_selector_all("table")
        return f"找到 {len(tables)} 个表格"

自定义提示词:

custom_prompt = """你是一个专业的网页数据提取助手。
在执行任务前,请先创建 sandbox,然后使用浏览器工具完成任务。"""
agent = create_browser_agent(system_prompt=custom_prompt)

最佳实践

  1. 模块化设计:将 Sandbox 管理和 Agent 创建分离,提高代码可维护性
  2. 错误处理:所有工具都应包含完善的错误处理
  3. 资源清理:使用信号处理器确保资源正确清理
  4. VNC 提示:在工具返回中包含 VNC URL,方便用户使用
  5. 单例模式:确保 Sandbox 实例在会话中复用,避免重复创建

前端集成可视化监控(VNC)

VNC 集成架构

下图展示了前端如何集成 VNC 实现实时监控:

image

轻量级 HTML 页面集成

创建一个简单的 vnc-viewer.html 文件:

<!DOCTYPE html>
<html>
<head>
    <title>Browser Sandbox VNC 查看器</title>
    <style>
        body { margin: 0; padding: 0; background: 
#000
; }
        
#vnc
-container { width: 100vw; height: 100vh; }
    </style>
</head>
<body>
    <div id="vnc-container"></div>
    <script type="module">
        const params = new URLSearchParams(window.location.search);
        const vncUrl = params.get('url');
        if (!vncUrl) {
            alert('请提供 VNC URL 参数');
        } else {
            const module = await import('https://cdn.jsdelivr.net/gh/novnc/noVNC@v1.4.0/core/rfb.js');
            const RFB = module.default;
            const rfb = new RFB(
                document.getElementById('vnc-container'),
                vncUrl,
                { shared: true, credentials: { password: '' } }
            );
            rfb.scaleViewport = true;
        }
    </script>
</body>
</html>

使用方式:

import webbrowser
import urllib.parse
vnc_url = sandbox.vnc_url
encoded_url = urllib.parse.quote(vnc_url, safe='')
viewer_url = f"file:///path/to/vnc-viewer.html?url={encoded_url}"
webbrowser.open(viewer_url)

React 应用集成

核心组件代码:

import React, { useEffect, useRef } from 'react';
interface VNCViewerProps {
  vncUrl: string;
  onConnect?: () => void;
  onDisconnect?: () => void;
}
export const VNCViewer: React.FC<VNCViewerProps> = ({ 
  vncUrl, 
  onConnect, 
  onDisconnect 
}) => {
  const containerRef = useRef<HTMLDivElement>(null);
  useEffect(() => {
    let rfb: any;
    const initVNC = async () => {
      if (!containerRef.current || !vncUrl) return;
      const { default: RFB } = await import('@novnc/novnc/core/rfb');
      rfb = new RFB(containerRef.current, vncUrl, {
        shared: true,
        credentials: { password: '' }
      });
      rfb.scaleViewport = true;
      rfb.addEventListener('connect', () => onConnect?.());
      rfb.addEventListener('disconnect', () => onDisconnect?.());
    };
    initVNC();
    return () => {
      if (rfb) rfb.disconnect();
    };
  }, [vncUrl, onConnect, onDisconnect]);
  return (
    <div 
      ref={containerRef} 
      style={{ width: '100%', height: '600px', background: '
#000
' }} 
    />
  );
};

使用示例:

import React, { useState, useEffect } from 'react';
import { VNCViewer } from './VNCViewer';
function App() {
  const [vncUrl, setVncUrl] = useState<string>('');
  useEffect(() => {
    fetch('/api/sandbox/create', { method: 'POST' })
      .then(res => res.json())
      .then(data => setVncUrl(data.vnc_url));
  }, []);
  return (
    <div>
      <h1>Browser Sandbox 实时监控</h1>
      {vncUrl ? (
        <VNCViewer 
          vncUrl={vncUrl}
          onConnect={() => console.log('已连接')}
          onDisconnect={() => console.log('已断开')}
        />
      ) : (
        <p>正在初始化...</p>
      )}
    </div>
  );
}

Puppeteer 和 Playwright 直接集成

如果您更熟悉传统的浏览器自动化库,也可以直接使用 Puppeteer 或 Playwright 连接到 Browser Sandbox。

使用 Playwright

from playwright.sync_api import sync_playwright
from agentrun.sandbox import Sandbox, TemplateType
# 创建 Sandbox
sandbox = Sandbox.create(
    template_type=TemplateType.BROWSER,
    template_name="your-template-name",
    sandbox_idle_timeout_seconds=3000
)
# 使用 Playwright 连接
with sync_playwright() as p:
    browser = p.chromium.connect_over_cdp(sandbox.cdp_url)
    page = browser.contexts[0].pages[0]
    # 执行操作
    page.goto("https://www.example.com")
    page.screenshot(path="screenshot.png")
    content = page.content()
    browser.close()
# 清理
sandbox.delete()

使用 Puppeteer(Node.js)

const puppeteer = require('puppeteer-core');
// CDP URL 从 Sandbox 获取
const cdpUrl = 'wss://your-account.funagent-data-pre.cn-hangzhou.aliyuncs.com/sandboxes/xxx/ws/automation';
(async () => {
  const browser = await puppeteer.connect({
    browserWSEndpoint: cdpUrl,
    defaultViewport: null
  });
  const page = (await browser.pages())[0];
  await page.goto('https://www.example.com');
  await page.screenshot({ path: 'screenshot.png' });
  await browser.close();
})();

总结

通过本教程,您已经学会了:

  1. AgentRun SDK 基础: 如何使用 SDK 创建和管理 Browser Sandbox
  2. LangChain 集成: 如何将 Sandbox 封装为 LangChain Tools
  3. VNC 可视化: 如何在前端集成 VNC 实现实时监控
  4. 直接集成: 如何使用 Puppeteer/Playwright 直接连接 Sandbox

相关链接:

[1] Agentrun 控制台网站

https://functionai.console.aliyun.com/cn-hangzhou/agent/runti...

[2] noVNC 在线客户端

https://novnc.com/noVNC/vnc.html

背景

一开始是通过Api获取数据,但是最近他们增加X-Gnarly参数,而且在github上没有找有效的方案后,放弃api请求,改用页面爬取的方式。彻底避免参数加密校验。

我的环境

    python 3.11 
    selenium 4.39.0
    playwright 1.57.0

评论页面

实现啦抓取第一页和第二页的评论,你们要是抓更多页可以吧第二页改成循环。
执行脚本后会在当前目录生成一份json文件,里面是/api/comment/list/接口返回的数据。

 python3.11 comment_scraper.py "@mahi.islam.oliva/video/7565942090039954706"

image.png

代码如下:

import json
import time
import sys
import base64
import re,os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import argparse



def merge_comments(first_page, second_page):
    """合并两页的评论数据"""
    merged_data = first_page.copy()
    if 'comments' in second_page:
        if 'comments' not in merged_data:
            merged_data['comments'] = []
        merged_data['comments'].extend(second_page['comments'])
    return merged_data

def extract_tiktok_filename(path: str) -> str:
    """
    从 TikTok 路径(如 '@username/video/123456')中提取 'username_123456'
    支持带或不带 @、带 URL 等情况
    """
    # 匹配模式:可选的 @ + 用户名(字母数字下划线.)+ /video/ + 数字ID
    match = re.search(r'@?([\w.]+)/video/(\d{16,})', path)
    if match:
        username = match.group(1)
        video_id = match.group(2)
        return f"{username}_{video_id}"
    else:
        # 如果格式不符,回退到清理后的通用方式
        safe = re.sub(r'[\\/:*?"<>|\s]+', '_', path.strip('@/'))
        return safe[:100]


class TiktokScraper:
    def __init__(self):
        self.comments_data = []
        self.setup_driver()


    def setup_driver(self):

        chrome_options = Options()
        chrome_options.set_capability("goog:loggingPrefs", {"performance": "ALL"})

        chrome_options.add_argument("--start-maximized")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--headless=new")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-blink-features=AutomationControlled")
        chrome_options.add_argument("--disable-infobars")
        chrome_options.add_argument("--disable-extensions")
        chrome_options.add_argument("--disable-gpu")  # 减少 WebGL 差异(可选)
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)

        user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
        chrome_options.add_argument('user-agent={0}'.format(user_agent))

        self.driver = webdriver.Chrome(options=chrome_options)

        self.driver.execute_cdp_cmd("Emulation.setDeviceMetricsOverride", {
            "width": 1440,
            "height": 900,
            "deviceScaleFactor": 2,  # macOS Retina
            "mobile": False
        })

        # 覆盖 WebGL 参数(关键!)
        self.driver.execute_cdp_cmd("Emulation.setHardwareConcurrencyOverride", {"hardwareConcurrency": 8})
        # 1. 设置基础 UA(CDP 安全方式)
        self.driver.execute_cdp_cmd("Emulation.setUserAgentOverride", {
            "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "platform": "MacIntel"
        })

        # 2. 用 JS 覆盖高级指纹(包括 userAgentData)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": """
#             delete navigator.__proto__.webdriver;

            Object.defineProperty(navigator, 'platform', { get: () => 'MacIntel' });
            Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });

            // 伪造 userAgentData
            if (!navigator.userAgentData) {
                Object.defineProperty(navigator, 'userAgentData', {
                    value: {
                        brands: [
                            { brand: "Chromium", version: "120" },
                            { brand: "Google Chrome", version: "120" },
                            { brand: "Not:A-Brand", version: "99" }
                        ],
                        mobile: false,
                        platform: "macOS",
                        getHighEntropyValues: async (hints) => ({
                            architecture: "x86_64",
                            model: "",
                            platform: "macOS",
                            platformVersion: "13.5",
                            uaFullVersion: "120.0.6099.0"
                        })
                    },
                    writable: false,
                    configurable: false
                });
            }
            """
        })

        # 覆盖 WebGL 渲染器(防指纹关键)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": """
            const getParameter = WebGLRenderingContext.prototype.getParameter;
            WebGLRenderingContext.prototype.getParameter = function(param) {
                if (param === 37445) return 'Apple Inc.'; // UNMASKED_VENDOR_WEBGL
                if (param === 37446) return 'Apple GPU';   // UNMASKED_RENDERER_WEBGL
                return getParameter.call(this, param);
            };
            """
        })
        self.driver.execute_cdp_cmd("Emulation.setTimezoneOverride", {"timezoneId": "America/New_York"})
        self.driver.execute_cdp_cmd("Emulation.setLocaleOverride", {"locale": "en-US"})

    def extract_comment_response_from_logs(self):
        """从 performance 日志中提取评论 API 的完整响应"""
        try:
            logs = self.driver.get_log("performance")
        except Exception as e:
            print(f"获取日志失败: {e}")
            return None

        request_id_to_url = {}
        finished_request_ids = set()

        for entry in logs:
            try:
                message = json.loads(entry["message"])
                method = message.get("message", {}).get("method")
                params = message.get("message", {}).get("params", {})

                if method == "Network.responseReceived":
                    url = params.get("response", {}).get("url", "")
                    request_id = params.get("requestId")
                    if request_id and re.search(r'comment.*list|comments.*aweme', url, re.I):
                        request_id_to_url[request_id] = url

                elif method == "Network.loadingFinished":
                    request_id = params.get("requestId")
                    if request_id:
                        finished_request_ids.add(request_id)
            except Exception:
                continue

        for req_id, url in request_id_to_url.items():
            if req_id in finished_request_ids:
                try:
                    body = self.driver.execute_cdp_cmd(
                        "Network.getResponseBody",
                        {"requestId": req_id}
                    )
                    raw = body.get("body", "{}")
                    if body.get("base64Encoded"):
                        raw = base64.b64decode(raw).decode("utf-8")
                    data = json.loads(raw)
                    if isinstance(data, dict) and ("comments" in data or "item_comments" in data):
                        print(f"✅ 捕获评论接口: {url}")
                        return data
                except Exception as e:
                    print(f"获取响应体失败 (req_id={req_id}): {e}")

        return None

    def scroll_comment_section(self):
        """在 .TUXTabBar-content 内部查找并滚动真正的评论列表容器"""
        script = """
            const tabContent = document.querySelector('.TUXTabBar-content');
            if (!tabContent) {
                console.log('❌ .TUXTabBar-content not found');
                return false;
            }

            // 获取所有子 div
            const candidates = Array.from(tabContent.querySelectorAll('div'));

            // 按 DOM 层级深度排序(优先选深层级的,通常是列表)
            candidates.sort((a, b) => {
                let depthA = 0, depthB = 0;
                let p = a; while (p && p !== tabContent) { depthA++; p = p.parentElement; }
                p = b; while (p && p !== tabContent) { depthB++; p = p.parentElement; }
                return depthB - depthA; // 深的优先
            });

            for (const el of candidates) {
                const style = window.getComputedStyle(el);
                const overflowY = style.overflowY;
                // 必须满足:可滚动 + 有溢出内容
                if ((overflowY === 'auto' || overflowY === 'scroll') &&
                    el.scrollHeight > el.clientHeight) {
                    el.scrollTop = el.scrollHeight+100;
                    console.log('✅ Scrolled real comment container');
                    return true;
                }
            }

            console.log('⚠️ No scrollable child found in .TUXTabBar-content');
            return false;
        """
        try:
            result = self.driver.execute_script(script)
            return result is True
        except Exception as e:
            print(f"滚动执行异常: {e}")
            return False

    def auto_play_and_load_more_comments(self, user_input):

        url = 'https://www.tiktok.com/' + user_input
        print(f"打开视频页面: {url}")
        self.driver.get(url)
        wait = WebDriverWait(self.driver, 20)
        # wait.until(EC.presence_of_element_located((By.TAG_NAME, "video")))
        # 等待评论tab加载完毕
        # wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.TUXTabBar-list")))
        wait.until(EC.presence_of_element_located((By.XPATH, "//span[@data-e2e='comment-icon']")))

        print("视频评论已加载")

        # 点击评论按钮
        try:
            comment_span = wait.until(
                EC.element_to_be_clickable((By.XPATH, '//span[@data-e2e="comment-icon"]'))
            )
            print("正在点击评论图标 (span[@data-e2e='comment-icon'])...")
            self.driver.execute_script("arguments[0].click();", comment_span)
        except Exception as e:
            print(f"无法点击评论按钮: {e}")
            return

#         time.sleep(2)
#         debug_prefix = extract_tiktok_filename(user_input)
#         try:
#             # 保存 HTML
#             with open(f"{debug_prefix}_after_click.html", "w", encoding="utf-8") as f:
#                 f.write(self.driver.page_source)
#             print(f"页面 HTML 已保存: {debug_prefix}_after_click.html")
#
#             # 保存截图
#             self.driver.save_screenshot(f"{debug_prefix}_after_click.png")
#             print(f"页面截图已保存: {debug_prefix}_after_click.png")
#         except Exception as e:
#             print(f"保存调试文件失败: {e}")

        # 加载第一页评论
        first_page_data = self.wait_for_comments(10)
        if not first_page_data:
            print("未捕获到第一页评论")
            return
        self.comments_data.append(first_page_data)

        # 模拟滚动加载更多评论
        # self.driver.execute_script("document.querySelector('.TUXTabBar-content').scrollTo(0, document.querySelector('.TUXTabBar-content').scrollHeight);")
        # 改为调用新方法
        time.sleep(1)
        if self.scroll_comment_section():
            print("已滚动加载更多评论...")
            time.sleep(1)  # 等待新评论加载
        else:
            print("无法滚动评论区,可能结构变化")

        # 加载第二页评论
        second_page_data = self.wait_for_comments(10)
        if second_page_data:
            # 假设每页返回的数据结构相似,合并 comments 字段
            merged_comments = merge_comments(first_page_data, second_page_data)
        else:
            merged_comments = first_page_data
            print("未捕获到第二页评论")


        filename = f"{extract_tiktok_filename(user_input)}.json"
        print(filename)
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(merged_comments, f, ensure_ascii=False, indent=2)
        print(f"评论数据已保存到: {filename}")
        print(f"   共 {len(merged_comments.get('comments', []))} 条评论")

    def wait_for_comments(self, timeout_seconds=10):
        """等待并捕获评论API响应"""
        start_time = time.time()
        while time.time() - start_time < timeout_seconds:
            comment_data = self.extract_comment_response_from_logs()
            if comment_data:
                return comment_data
            time.sleep(0.5)
        return None

    def close(self):
        if hasattr(self, "driver"):
            self.driver.quit()


def main():
    parser = argparse.ArgumentParser(
        description="Scrape TikTok comments via /api/comment/list/ ")
    parser.add_argument(
        "video_input",
        help="TikTok video URL or video_id, e.g., '/@user/video/7318855966163275054' "
    )
    args = parser.parse_args()

    video_input = args.video_input.strip()
    print(video_input)

    if not video_input:
        print("Error: Video input cannot be empty")
        sys.exit(1)


    scraper = TiktokScraper()
    try:
        scraper.auto_play_and_load_more_comments(video_input)
        time.sleep(1)  # 保持窗口打开以便观察
    finally:
        scraper.close()

    sys.exit(0)

if __name__ == "__main__":
    main()

用户页面发布的视频

这里只实现啦只第一页接口的数据, /api/post/item_list/把这个接口的数据放到啦一个json文件中。
这个页面我做了根据cookie的登陆,其实不登陆应该也可以。cookie 文件是通过chrome扩展 Cookies.txt 生成。登陆TikTok后点击这个扩展下载文件下来就行。
image.png

python3.11 post_item_list.py @dlw2026
image.png

post_item_list.py 代码如下:

# scraper.py
import asyncio
import json
import sys
import argparse
from playwright.async_api import async_playwright
from cookies import load_cookies_safely


# 这是用来抓取用户主页的 /api/post/item_list/


async def scrape_tiktok_user(username):
    target_responses = []
    clean_username = username.lstrip("@")
    output_json = clean_username + "_posts.json"
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(
            viewport={"width": 1920, "height": 1080},
            user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
        )

        cookies = load_cookies_safely()
        await context.add_cookies(cookies)
        page = await context.new_page()

         # 隐藏自动化特征
        await page.add_init_script("""
            // 隐藏 webdriver 标志
            delete navigator.__proto__.webdriver;
            window.chrome = { runtime: {} };
            // 伪造 platform 为 Mac
            Object.defineProperty(navigator, 'platform', {
                get: () => 'MacIntel'
            });
            // 伪造 userAgentData(高熵指纹)
            if (!navigator.userAgentData) {
                Object.defineProperty(navigator, 'userAgentData', {
                    value: {
                        brands: [
                            { brand: "Chromium", version: "120" },
                            { brand: "Google Chrome", version: "120" },
                            { brand: "Not:A-Brand", version: "99" }
                        ],
                        mobile: false,
                        platform: "macOS",
                        getHighEntropyValues: async (hints) => ({
                            architecture: "x86_64",
                            model: "",
                            platform: "macOS",
                            platformVersion: "13.5",
                            uaFullVersion: "120.0.6099.0"
                        })
                    },
                    writable: false,
                    configurable: false
                });
            }
        """)

        # ✅ 关键:宽松匹配 API(不再检查 content-type)
        def handle_response(response):
            url = response.url
            if (
                    "/api/post/item_list/" in url
                    and response.status == 200
                    and "tiktok.com" in url
            ):
                if not target_responses:
                    target_responses.append(response)
                    print(f"捕获 API: {url.split('?')[0]}")

        page.on("response", handle_response)

        url = f"https://www.tiktok.com/{username}"
        print(f"打开页面: {url}")
        await page.goto(url, wait_until="domcontentloaded", timeout=50000)

        # 等待用户信息出现
        try:
            await page.wait_for_selector('h1[data-e2e="user-title"]', timeout=15000)
            print("用户主页加载成功")
        except:
            print("用户信息未加载,继续尝试...")

        # 滚动一下,触发懒加载(重要!)
        await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 3)")

        # 等待 API(最多 20 秒)
        for i in range(40):
            if target_responses:
                break
            if i % 10 == 0:
                print(f"⏳ 等待 API 中... ({i * 0.5}s)")
            await asyncio.sleep(0.5)

        api_data = None
        if target_responses:
            try:
                api_data = await target_responses[0].json()
                print("✅ 成功解析 JSON 数据")
            except Exception as e:
                # 如果 .json() 失败,可能是 text/plain,手动解析
                try:
                    text = await target_responses[0].text()
                    api_data = json.loads(text)
                    print("✅ 通过 .text() 成功解析 JSON")
                except:
                    print(f"❌ 完全无法解析响应: {e}")

        if api_data:
            with open(output_json, "w", encoding="utf-8") as f:
                json.dump(api_data, f, ensure_ascii=False, indent=2)
            items = api_data.get("itemList", [])
            print(f"抓取到 {len(items)} 个视频,已保存至 {output_json}")
        else:
            print("未捕获到任何 API 数据")
            # 调试:打印所有请求(可选)
#             await page.route("**/*", lambda route: print("REQ:", route.request.url) or route.continue_())
#         screenshot_path = f"{clean_username}_homepage.png"
#         await page.screenshot(path=screenshot_path, full_page=True)
#         print(f"已保存页面截图: {screenshot_path}")

        await page.wait_for_timeout(5000)
        await browser.close()
        if api_data:
            return True
        else:
            return False


def main():
    parser = argparse.ArgumentParser(description="Scrape TikTok user profile")
    parser.add_argument("username", help="TikTok username (with or without @), e.g., @dishilife or dishilife")
    args = parser.parse_args()

    username = args.username.strip()
    if not username:
        print("Error: Username cannot be empty")
        sys.exit(1)
    if not username.startswith('@'):
        username = '@' + username
    success = asyncio.run(scrape_tiktok_user(username))
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()

cookies.py脚本:

import os
from datetime import datetime



COOKIES_FILE = "cookies.txt"

def load_cookies_safely():
    filepath = COOKIES_FILE
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"❌ Cookie 文件不存在: {os.path.abspath(filepath)}")

    cookies = []
    current_ts = int(datetime.now().timestamp())
    tiktok_domains = {".tiktok.com", "www.tiktok.com"}

    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            parts = line.split("\t")
            if len(parts) < 7:
                continue

            domain = parts[0]
            if domain.startswith("#HttpOnly_"):
                domain = domain[len("#HttpOnly_"):]
            if not domain.startswith("."):
                domain = "." + domain.lstrip(".")

            if not any(t in domain for t in tiktok_domains):
                continue

            cookie = {
                "name": parts[5],
                "value": parts[6],
                "domain": domain,
                "path": parts[2],
                "secure": parts[3].upper() == "TRUE",
            }

            expires_str = parts[4]
            if expires_str.isdigit():
                expires = int(expires_str)
                if expires > current_ts:
                    cookie["expires"] = expires

            cookies.append(cookie)

    if not cookies:
        raise ValueError("❌ 未加载有效 Cookie!请确认包含 sessionid。")
    return cookies

if __name__ == "__main__":
    print('不可以直接执行')

佬们,

我开发了 Linky,一个能自动在各类网站提交链接(发外链)的 AI 工具。

传统工具的痛点:

  • 名单固定: 往往只能在预设的 500-2000 个站点里打转。
  • 极其脆弱: 只要目标网站改个 UI,脚本就报错。
  • 特征明显: 机械化的操作路径很容易被反爬虫系统识别。

我的解决方案:

Linky 结合了 AI(Claude/OpenAI)Playwright。它不再是死记硬背脚本,而是通过理解网页结构来智能操作。

核心亮点:

  • 全网通用: 你给它任何网址,它都能尝试去理解并完成提交,不局限于固定列表。
  • 自适应 UI: 即使网站改版,AI 也能通过语义理解找到正确的输入框和按钮。
  • “动作回放” (Action Replay): AI 第一次成功探索后会记录路径,后续执行零成本,无需重复消耗 Token。
  • 拟人化操作: 模拟真实的点击和录入行为,规避自动化检测。
  • 自带 Key (BYOK): 用户自带 API Key,成本完全透明,不收中间费。

技术栈:

  • 桌面端: Electron + React 19
  • 后端: Python + FastAPI + browser-use (AI 浏览器框架)
  • 自动化: Playwright

目前仅支持 macOS,Windows 版本正在路上。

这还是个正在开发中的早期项目,我不知道大家是否认为它有用。

如果你对这个工具感兴趣的话,欢迎去 GitHub 点个 Star 以给我反馈!

GitHub 传送门: https://github.com/jiweiyeah/linky-ai

(补充:这工具是用 Claude 写的,用 AI 来写一个 AI 工具,这种感觉挺奇妙的 )


📌 转载信息
原作者:
yeheboo
转载时间:
2026/1/20 10:18:52

佬们,

我开发了 Linky,一个能自动在各类网站提交链接(发外链)的 AI 工具。

传统工具的痛点:

  • 名单固定: 往往只能在预设的 500-2000 个站点里打转。
  • 极其脆弱: 只要目标网站改个 UI ,脚本就报错。
  • 特征明显: 机械化的操作路径很容易被反爬虫系统识别。

我的解决方案:

Linky 结合了 AI ( Claude/OpenAI )Playwright。它不再是死记硬背脚本,而是通过理解网页结构来智能操作。

核心亮点:

  • 全网通用: 你给它任何网址,它都能尝试去理解并完成提交,不局限于固定列表。
  • 自适应 UI: 即使网站改版,AI 也能通过语义理解找到正确的输入框和按钮。
  • “动作回放” (Action Replay): AI 第一次成功探索后会记录路径,后续执行零成本,无需重复消耗 Token 。
  • 拟人化操作: 模拟真实的点击和录入行为,规避自动化检测。
  • 自带 Key (BYOK): 用户自带 API Key ,成本完全透明,不收中间费。

技术栈:

  • 桌面端: Electron + React 19
  • 后端: Python + FastAPI + browser-use (AI 浏览器框架)
  • 自动化: Playwright

目前仅支持 macOS ,Windows 版本正在路上。

这还是个正在开发中的早期项目,我不知道大家是否认为它有用。

如果你对这个工具感兴趣的话,欢迎去 GitHub 点个 Star 以给我反馈!

GitHub 传送门: https://github.com/jiweiyeah/linky-ai

(补充:这工具是用 Claude 写的,用 AI 来写一个 AI 工具,这种感觉挺奇妙的 😄)

第一稿,出去吃火锅了…


断断续续研究了将近一天,终于把 cc codex 和 opencode 都迁移到了 wsl 里 了;在这里开个记录帖子,记录下自己遇到的问题以及解决方案,欢迎各位佬友友好交流,分享自己的经验。

1. 迁移动机(废话略多)

  • 最重要的一点,codex 在 windows 上的表现不佳且速度慢
    在 windows 上使用 codex 的佬友一定经常见到过 codex 的 PowerShell 语法错误… 大量的时间和 token 被浪费于此,尽管在提示词里针对 codex 经常犯错的部分进行了提示,但仍无法有效地解决,根本原因还是 pwsh 的训练数据太少,而 unix 类天生就是大家默认的开发环境,数据多,支持地更好。且在 Linux 上 codex、cc 运行起来也更加丝滑

  • opencode 的 windows 兼容过于垃圾
    opencode 在爆火之前我就早早的接触过了,然而… 其对 windows 的支持可谓十分之差,经常出现上一个版本还是可以启动,下一个版本就不能启动了,打开 opencode 的 github 你会发现类似的 issue 层出不穷,最经典的就是渲染出错,启动 opencode,出现 opencode [object] [Object];在经历数次 codex 手动修复让其能够启动后,我受够了。

  • 常用的工具包或插件如 codeagent 和 claude mem 等对于 windows 的支持也很差
    前者是我使用频率最高的 workflow+skill;而后者我更是从来没有在 windows 上成功启动过,考虑到众多插件都会优先支持 Linux 和 MacOS,我还是迁移的好。

2. windows 已有配置

  • cc switch
    我是 cc switch 的忠实用户,ccs 的伟大无需多言;我用到的核心功能主要是

    1. 多配置切换,供应商 + 全局提示词;我有各家的 coding plan 以及自己手写的多套全局提示词,十分需要 ccs 统一管理;

    ccs 在当前版本可以指定 wsl 目录,这使得迁移起来没有什么难度,但是我并没有选择使用。

    1. ccs 的代理和统计功能,开启后可以直观地观测自己每天的 token 用量;ccs 的本地代理功能也是此次迁移较为顺利的根本原因

3. 迁移记录

3.1 基础部分

  1. 安装 codex claude code opencode;过于简单,为了方便,我统一用 npm 了;

  2. 添加供应商配置,这个时候,刚才说过的 ccs 的本地代理功能就派上用场了;只需要把 url 指向 ccs 的代理端口即可,这里以 codex 为例


    我出于一些考量,并没有选择用 ccs 直接配置,你也可以在这里更改配置目录后用 ccs 进行配置,更加自动

3.2 问题来了

mcp 迁移,对于一些 npx nvx 或者 remote 类型的 mcp 很简单,都不需要改;

都知道 wsl 好,为什么我迟迟没有迁移打算呢,问题就在于这里,因为个人手搓的小玩意对于 playwright 的依赖程度较高,而 wsl… 配置图形化… 比较麻烦吧,支持地也不好,当然 这些都不是问题。

在调研站里的方案,以及询问 gpt52pro 后,我有了方案 1:

方案 1: WSL2 里跑 Linux Playwright(WSLg)

在 WSL2 里跑 Linux 浏览器(WSLg 出窗口),这个… 需要做不少的前置准备,且最后的效果也不尽人意,我在跑通后遂放弃。

1 首先必须有显示环境(WSLg)

 echo "$DISPLAY" echo "$WAYLAND_DISPLAY" echo "$XDG_RUNTIME_DIR" 

2 得有浏览器二进制(Playwright 下载的 Chromium/Firefox/WebKit)


npx playwright install

3 必须有系统依赖(Ubuntu 的一堆 .so)

 # 安装系统依赖 sudo env "PATH=$PATH" npx playwright install-deps

这一套跑完,可以按照站里佬的方案持久化安装 playwright 或者 npx 启动;

它的问题是什么呢?WSLg 的浏览器实在太丑了!包括且不限于

  1. 字体的缺失导致渲染出来的网页相当地丑,你根本无法判断是自己 UI 设计问题还是渲染问题
  2. 启动时候默认一个半截窗口,全屏后不会响应式调整页面大小!这个过于逆天

尤其是 2 是我放弃方案 1 的最直接原因;

而方案 2 是我现在使用的方案,我似乎没有看到有佬提及这个方案来着?有一个佬友给出了类似的 chrome dev mcp 的安装方式,所以我就放上来供佬友参考;

方案 2: Windows 侧 Playwright MCP

基本的原理:

WSL2 (Ubuntu): claude / codex
  |
  v
/mnt/c/Windows/System32/cmd.exe /c npx @playwright/mcp@latest
  |
  v
Windows: Node+npx -> @playwright/mcp -> Playwright -> Browser (Windows GUI)

那么很简单了,以 cc 为例子:

"playwright": { "type": "stdio", "command": "/mnt/c/Windows/System32/cmd.exe", "args": ["/c", "npx", "@playwright/mcp@latest"], "env": {} } 

就这么简单

4. 番外部分

4.1 官方订阅问题

当前版本的 ccs 在 codex official oauth 时候不支持本地代理功能,那么如何也让它走统计呢?答案很简单,走 cliproxyapi 这类的反代工具套一层即可,这样子官方订阅也就可以被 ccs 后台统计用量了;

同理,对于 antigravity tool 这类工具,也可以接入到 ccs 里统一参与用量统计;

4.2 opencode 问题

如何让 opencode 也能用上 ccs 的代理功能,其实站里有佬友已经给出解决方案了,以 codex 为例,答案就是覆写 opencode 的 openai 供应商的 base_url 字段,更改其为 ccs 暴露的本地代理端口;

不过我由于日常只有 cc 在用 omo 工作流时候会用到 opencode 的 grok 模型,对 opencode 的研究并不多,当前我发现一个很奇怪的问题,就是 opencode 会请求 gpt-5-nano;这里我还没有找到很好的解决思路。

希望有佬友可以出手!


📌 转载信息
原作者:
FunctorFish
转载时间:
2026/1/18 19:07:52

用 Claude Code 跑自动化测试时遇到过这种情况吗?生成了一堆测试用例,跑完发现 Token 烧了不少,结果一半是废的 —— 要么断言写错,要么压根定位不到元素。

所以我花了点时间把市面上的 AI 测试方案摸了一遍。就两条路,DOM 解析或者视觉识别,先选一个深耕就行。


DOM

原理没什么花活。DOM 就是浏览器把 HTML 页面变成一棵可以被 JavaScript 随意增删改查的树状对象结构。简单说,你写的 HTML 被浏览器变成了一堆 "会动的积木",这些积木可以用 JavaScript 随便抓、改、删、加。

AI 通过选择器找到这些积木,Playwright 负责点点点。就这样。

推荐工具:agent-browser

这是 Vercel 出的一个专门用于 AI Agent 的浏览器自动化 CLI,支持无头模式,可以不打开浏览器进行测试。

# 安装 CLI
npm install -g agent-browser

# 安装驱动
agent-browser install

# 验证安装
agent-browser open baidu.com

# 返回以下内容表示成功 # ✓ 百度一下,你就知道 #   https://www.baidu.com/ 

这条路的好处很直接 ——Token 消耗低,跑得快,CI/CD 接进去顺滑。

但有个前提:你的 DOM 结构得稳定。

纯视觉

这个思路更直观:将页面截图发送给视觉大模型识别,模型返回下一步操作指令,循环执行直到完成全部任务。

推荐工具:Midscene

这是字节开源的纯视觉测试方案,支持 Web、Android、iOS,兼容多种视觉模型:Gemini、Qwen-VL、Doubao-VL。自然语言写测试目标,AI 自己生成脚本。

但 Token 消耗高。跑一轮测试下来成本不低。而且视觉模型偶尔会出问题。

所以视觉方案更像是补充:DOM 搞不定的场景,比如验证 UI 有没有错位、样式有没有问题,再让视觉上。

ChatGPT AgentMode

这是 ChatGPT 的 AgentMode 功能需要订阅企业版或者 Pro 版本 (team 也可以) 它会启动一个虚拟浏览器访问你要求的网址像真人一样在你的平台浏览、点击按钮、账号登录 ,要求最后给我详细的测试报告,如果你的网站已经部署到公网上那么就可以使用 Agent Mode 进行测试。


缺点是最少需要购买 Team 或者 Pro。

怎么选?

场景建议
页面结构稳定,预算有限DOM + Playwright
页面动效多、结构不稳定视觉方案
想检测布局错位、样式问题必须视觉,DOM 看不出来
成本敏感DOM 为主

几个坑提前说

成本控制:先让 AI 生成用例,人审完再跑。不然跑完发现一半用例有问题,Token 白烧了。

CI/CD 集成:配置的时候注意超时设置。视觉方案跑得慢,默认超时可能不够。

自愈能力:市面上很多工具都说支持 "自愈",意思是 UI 改了之后测试脚本能自动调整选择器。实际效果看情况,改个按钮文案能自愈,重构了页面结构还是得手动改。


核心就一句:DOM 打底,视觉补充,别一开始就 all in 最贵的方案。

有实践经验的欢迎评论区交流。


相关链接


📌 转载信息
原作者:
benchen
转载时间:
2026/1/15 18:35:29

AIPex 最新发布了新版本,其中最重要的能力之一,是浏览器任务可以在后台运行,而不打断用户的正常工作流

这一能力并非来自某个“技巧”,而是源于一个明确的工程选择:
我们有意识地避免将浏览器控制建立在 debugger ( Chrome DevTools Protocol )之上。

本文将解释为什么主流方案普遍选择 debugger ,以及 AIPex 为什么在多数智能代理与日常自动化场景中,选择了一条不同的路线。

为什么大多数浏览器控制方案选择 debugger ( CDP )

在当前无需迁移的浏览器自动化插件或 Agent 中,常见方案包括:

  • Manus 的 Manus Browser Operator
  • Claude 推出的 Claude in Chrome
  • 开源社区的 nano browser
  • 以及 Puppeteer / Playwright 等自动化工具的扩展形态

这些方案通常基于 Chrome DevTools Protocol ( CDP ),尤其是其 debugger 能力来实现浏览器控制,原因并不复杂:

1. 能力覆盖完整

CDP 提供了浏览器内部几乎所有关键能力,包括:

  • 页面导航与生命周期控制
  • DOM 与 AXTree ( Accessibility Tree )访问
  • 事件注入(鼠标、键盘、滚轮)
  • 网络拦截与修改
  • 截图、录屏、性能采样

对于复杂自动化而言,CDP 是一个“开箱即用”的全能力接口。


2. 可访问性树( AXTree )高度语义化

通过 CDP ,可以直接获取浏览器构建的 Accessibility Tree

  • 每个节点都具备 role / name / state
  • 天然适合语音辅助与 AI 理解
  • 在理想 ARIA 实现下,语义质量很高

因此,AXTree 成为了许多 AI Agent 的主要页面表达形式。


3. 工程生态成熟

围绕 CDP 已经形成成熟工具链:

  • Puppeteer 、Playwright 等底层实现
  • 完整的文档、示例与社区经验
  • 对自动化工程师而言,学习与接入成本明确


debugger ( CDP )在桌面场景中的现实代价

尽管 CDP 能力强大,但在“与用户并行工作的桌面场景”中,它也带来了一些难以忽视的问题。

1. 前台焦点与用户体验问题

CDP 并非以“后台无打扰”为设计目标。

在真实桌面环境中:

  • debugger attach 往往会触发 Tab 激活或窗口前置
  • 输入与视觉焦点可能被强制抢占
  • 即使通过 headless 或参数规避,也难以在不同平台与浏览器上保证一致行为

结果是:
当用户正在使用其他应用或标签页时,自动化任务可能打断其当前操作,严重影响体验。


2. 浏览器与运行环境耦合

使用 CDP 通常意味着:

  • 需要启用调试端口
  • 强绑定 Chrome / Chromium
  • 对部分嵌入式 WebView 、受限环境或非 Chromium 浏览器支持不佳

在企业环境或多浏览器生态中,这种耦合会显著增加部署与维护成本。


3. 安全与权限摩擦

调试端口、进程权限、证书配置等问题,在企业与受管环境中常常触发:

  • 安全策略拦截
  • 合规审查
  • IT 运维阻力

这类问题并非技术不可解,而是部署摩擦成本过高


为什么浏览器控制不一定需要 debugger

AIPex 的核心设计目标是:

让浏览器任务像“背景思考”一样运行,而不是像“远程操控”一样打断用户。

为此,我们选择了一条不以 debugger 为中心的路径。


AIPex 的方案:DOM 语义快照 + 轻量交互

在页面侧,AIPex 采用纯 JavaScript / TypeScript 能力,实现:

  • 语义化页面快照
  • 稳定节点映射
  • 轻量级事件交互

而不是依赖 CDP 的 AXTree 与调试通道。

1. 语义快照,而非调试树

AIPex 基于 @aipexstudio/dom-snapshot

  • 直接遍历 DOM Tree
  • 提取可访问性相关语义( role / name / state )
  • 不依赖 CDP 的 Accessibility Tree ( AXTree )

该库在 README 中明确说明:
它是一个纯 DOM 方案,而非 CDP 的替代封装。


2. 稳定、可复用的节点 ID

自动为页面元素生成稳定的:data-aipex-nodeid

这使得:

  • “语义快照中的节点”与“真实 DOM 元素”之间的映射可长期复用
  • 避免调试态下常见的选择器漂移问题
  • 支持从文本命中直接反查到可操作元素


3. 面向可交互对象的快照策略

语义快照优先关注:

  • 按钮、链接、输入框等可操作元素
  • 对话与任务相关的界面子集

并过滤:

  • display: none
  • visibility: hidden
  • aria-hidden
  • inert

从而避免将无意义或不可见节点暴露给 Agent 。


4. 文本化表达与语义搜索

快照可被转换为可朗读、可搜索的文本形式( TextSnapshot ):

→uid=dom_abc123 RootWebArea "My Page" <body>
uid=dom_def456 button "提交" <button>
uid=dom_ghi789 textbox "邮箱" <input> desc="请输入邮箱"
StaticText "欢迎回来"
*uid=dom_jkl012 link "了解更多" <a>

其中:

  • 表示当前聚焦元素

→ 表示焦点祖先

该表示既适合 TTS / 语音播报,也支持自然语言驱动的检索。

  1. 语义搜索示例
    支持管道分隔与 glob 搜索:
searchSnapshotText(formatted, '登录 | Login | Sign In');
searchSnapshotText(formatted, 'button* | *submit*', {
  useGlob: true,
  contextLevels: 2
});

命中的文本行可通过 data-aipex-nodeid 精确映射回 DOM 元素。

  1. 页面侧事件,而非调试注入

交互通过页面侧事件完成(如 click 、focus 、input ):

  • 通过内容脚本或扩展消息通道触发

  • 与后台任务调度通信

  • 无需调试端口

  • 不强制拉起前台窗口

网页语义表达的工程视角

在浏览器自动化与 AI Agent 场景中,最常被用作页面表达的主要有两类:

DOM Tree

来源:浏览器原生文档对象模型

特点:信息完整但冗余,语义弱

直接使用不利于 AI 理解与操作

Accessibility Tree ( AXTree )

来源:ARIA 语义派生

特点:高度语义化

局限:

  • 依赖站点 ARIA 实现质量

-节点信息并不完备

  • 远程访问通常依赖 CDP

在实践中,如果完全依赖 AXTree ,Agent 的“感知能力”往往受限于目标网站的可访问性水平——这在现实 Web 中并不理想。

AIPex 的选择与边界

通过对 DOM Tree 进行语义化处理,AIPex 在不依赖 debugger 的前提下,实现了:

  • 后台运行、不打断用户

  • 更完整的页面信息表达

需要说明的是:

对于涉及浏览器特权能力的场景(如网络拦截、性能采样、权限弹窗、文件系统访问等),CDP 仍然具有不可替代的价值。

AIPex 并非否定 debugger ,而是在日常自动化与智能代理场景中,优先选择对用户体验更友好的工程解法。

参考与来源

手里 2api 太多没地方用?别浪费,用来帮你闲鱼捡捡漏。
项目地址:

好久没发帖了,最近把旧坑补了补,前端后端都重构了一遍,试了试,用起来更顺手了,风控也基本规避的差不多了。

在线体验地址:web-ui
登录名:admin
登录密码:admin123
注意 在线体验不要录入自己的 apiKey 信息和闲鱼 cookie 信息!!

点点 Star 点点赞 来点夸夸
I need more credit! plz!


群众里有调皮鬼~把我跑的示例结果都删了 不过不影响 反正是在线体验 随便造


📌 转载信息
转载时间:
2026/1/12 17:05:33

没把我累疯,从今天早上 7 点半一直分析到刚刚,终于把 Kiro 协议注册机给 done 了

看到 佬的帖子 我想自己试试看

不过还是有点小毛病的,指纹生成有大问题,由于时间紧张我是混合了 BrowserForge 和我自己的垃圾指纹,勉强可以过 send otp, 但是会被极速封号,下周再完善一下全部改为 BrowserForge

还有个小问题就是 initizaloAUTH 返回的是个跳转链接,但是它是通过网页动态跳转的而不是 302 到 workflowHandle, 跳转先用 playwright 跳了(反正不是啥重要的,也没有指纹)下周再分析一下 main sso 的跳转逻辑

加密方面,twmic 的算法不难,然后密码那一点还有个加密

元旦收获好大,写了 OpenRouter,Groq,Ollama,Gumloop,ZAI,Qwen 的协议注册机 (Aliyun Captcha 我给他旁路了,100% solve 滑块,类似 turnstile solver)

Cerebras 和 Zencoder 只能浏览器了,recaptcha score 达不到 我准备自己写一个本地的 V2 解决器


📌 转载信息
转载时间:
2026/1/3 15:02:46

项目介绍:

基于 Django、langgraph、langchain 开发的 AI 自动化测试平台。

目前功能:

1、需求文档智能评审并指正需要改正的问题。
2、AI 根据知识库和需求文档生成测试用例。
3、自然语言用例执行并生成对应的 playwright 自动化脚本。
4、执行用例时自动截图上传会平台。
5、批量执行功能测试用例和 playwright 脚本并生成对应的报告。

效果展示



如果项目对你有帮助,请帮我点点 star ~

项目地址: MGdaasLab/WHartTest: WHartTest 是基于 Django REST Framework 与现代大模型技术打造的 AI 驱动测试自动化平台。平台聚合自然语言理解、知识库检索与嵌入搜索能力,结合 LangChain 与 MCP(Model Context Protocol) 工具调用,实现从需求到可执行测试用例的自动化生成与管理,帮助测试团队提升效率与覆盖率。

声明:

本人纯编程小白,项目由本人提供思路,AI 负责功能代码实现。(也算是站在巨人的肩膀上了)


📌 转载信息
原作者:
duanxc
转载时间:
2025/12/30 18:07:51