标签 自动化工具 下的文章

作者:王元

1. 背景与痛点:存量代码的“多语言噩梦”

在前端开发中,将一个成熟的中文存量项目进行国际化多语言(i18n)改造,往往面临着以下困境:

•工作量巨大: 项目包含数百个 .vue/.js/.ts 等文件,散落着成千上万个硬编码的中文字符串。

•人工易错: 手动提取容易遗漏,且极其枯燥,极易产生 Copy/Paste 错误。

•命名困难: 为每一个中文词条想一个语义化的英文 Key(如 homePageTitle)不仅耗时,而且难以保证团队风格统一。

•维护成本高: 翻译文件(zh.ts/en.ts)的维护和代码中的替换需要同步进行,稍有不慎就会导致报错。

如果按照传统的人工查找替换方式,预计需要耗费数周的人力。为了打破这一僵局,我决定利用 JoyCode 结合我开发的 i18n-mcp 工具,打造一套自动化的国际化多语言解决方案。



2. 解决方案:JoyCode + i18n-mcp

我基于 MCP (Model Context Protocol) 开发了一个工具 i18n-mcp,通过 JoyCode 的 AI 能力来调度和执行以下三个核心步骤,实现了从“提取”到“替换”的全链路自动化。

流程图

以下是i18n-mcp的流程图(由JoyCode生成)

在这里插入图片描述

核心流程拆解

第一步:智能提取中文与去重

i18n-mcp 自动扫描所有源文件。利用正则或 AST(抽象语法树)精准识别代码中的中文字符串(包括 Template、Script 和 JSX 部分)。

•全量扫描(full-project-scan工具): 文件过多的时候,全量扫描会有问题。可以通过指定文件夹的方式,扫描该文件夹下面的文件。

•增量扫描(git-change工具):针对git变更的文件,进行扫描。精准定位变更文件,仅处理本次变更涉及的代码,大幅提升效率。

•智能去重: 对提取出的文本进行去重,确保相同的中文文案(如“确认”、“取消”)只生成一个 Key,避免冗余。

第二步:AI 辅助翻译与文件生成

•翻译缓存: 优先查询 数据存储层 中的 Translation Cache,已翻译过的文案直接复用,显著降低 Token 消耗并加速流程。

•自动化翻译: 提取的中文列表没有在缓存中或zh文件中的,被发送给 LLM,自动翻译成英文。

•语义化 Key 生成: 区别于传统 Hash 值,LLM 根据代码上下文(Context)自动生成符合语义的 Key(如将“请输入密码”生成为 pleaseInputPassword),提升代码可读性。

•文件落地: 自动在 lang 文件夹下生成标准的 zh.tsen.ts 文件。



生成示例: zh.ts: { "pleaseSelect": "请选择" } en.ts: { "pleaseSelect": "Please Select" }





第三步:一键代码替换

•变更预览 (Preview): 在实际修改前,可调用 preview-changes 工具展示即将变更的代码对比,确保修改符合预期。

•AST 节点替换: 使用 extract-and-replace 工具,将源代码中的硬编码字符串精准替换为国际化方法(如 $t('pleaseSelect'))。

•无损格式保持: 基于 AST 的替换策略能够完美保留原代码的缩进、换行和注释,修改后的代码无需二次 Lint 即可直接提交。





3. 成果与收益:从“数周”到“数小时”

通过引入 JoyCode + i18n-mcp 的实践,我在项目的国际化改造中取得了显著的成效:

📊 定量收益

维度传统人工方式JoyCode + i18n-mcp提升幅度
单页面改造耗时约 10-30 分钟< 1 分钟效率提升 90%+
词条遗漏率质量显著提升
变量命名耗时需人工构思AI 秒级生成完全自动化

💡 定性收益

1.解放生产力: 从枯燥的“搬运工”工作中解脱出来,可以专注于业务逻辑和核心功能的开发。

2.代码规范统一: AI 生成的 Key 风格高度统一(全驼峰),避免了“千人千面”的命名混乱。

3.可维护性增强: 建立了自动化的语言包管理机制,后续新增词条只需运行脚本即可。



4. i18n-mcp开发

i18n-mcp是我首次开发MCP,整体难度相对较低。对于前端部分,基于github模板进行开发,随后发布至公司NPM私服即可。

核心代码主要由JoyCode的编码功能协助完成。按照上述核心流程步骤通过问答交互的方式,引导JoyCode完成核心代码的开发工作。

整个i18n-mcp架构图如下所示(架构图亦由JoyCode生成)。

在这里插入图片描述



MCP配置如下

{
  "mcpServers": {
    "i18n-mcp": {
      "autoApprove": [],
      "disabled": true,
      "timeout": 180,
      "command": "npx",
      "type": "stdio",
      "transportType": "stdio",
      "args": [
        "-y",
        "@jd/i18n-mcp@latest"
      ],
      "env": {}
    }
  }
}

效果

配置之后,输入prompt “调用i18n-mcp的auto-i18n-process方法”

效果如下:

在这里插入图片描述

5. 总结

尽管目前 i18n-mcp 仍存在一些不足,例如在全面扫描大量文件时可能出现连接错误、翻译和替换结果不够准确等问题,仍需人工进行二次校验,但其在短时间内辅助开发的价值依然显著。在本次实践过程中,我主要通过 JoyCode 的交互式问答完成开发工作。JoyCode 不仅在代码补全方面发挥了重要作用,更凭借其强大的智能调度和自动化执行能力,成为高效处理复杂任务的核心中枢。结合 i18n-mcp 的开发,AI技术的深度赋能得以充分体现,大幅提升了开发的效率。

后续,我将持续研究 AI 在前端开发中的落地场景,充分发挥 AI 辅助开发的强大能力。通过深入探索和应用 AI 技术,进一步释放其在业务创新与效率提升方面的巨大潜力。

作者:王元

1. 背景与痛点:存量代码的“多语言噩梦”

在前端开发中,将一个成熟的中文存量项目进行国际化多语言(i18n)改造,往往面临着以下困境:

•工作量巨大: 项目包含数百个 .vue/.js/.ts 等文件,散落着成千上万个硬编码的中文字符串。

•人工易错: 手动提取容易遗漏,且极其枯燥,极易产生 Copy/Paste 错误。

•命名困难: 为每一个中文词条想一个语义化的英文 Key(如 homePageTitle)不仅耗时,而且难以保证团队风格统一。

•维护成本高: 翻译文件(zh.ts/en.ts)的维护和代码中的替换需要同步进行,稍有不慎就会导致报错。

如果按照传统的人工查找替换方式,预计需要耗费数周的人力。为了打破这一僵局,我决定利用 JoyCode 结合我开发的 i18n-mcp 工具,打造一套自动化的国际化多语言解决方案。



2. 解决方案:JoyCode + i18n-mcp

我基于 MCP (Model Context Protocol) 开发了一个工具 i18n-mcp,通过 JoyCode 的 AI 能力来调度和执行以下三个核心步骤,实现了从“提取”到“替换”的全链路自动化。

流程图

以下是i18n-mcp的流程图(由JoyCode生成)

在这里插入图片描述

核心流程拆解

第一步:智能提取中文与去重

i18n-mcp 自动扫描所有源文件。利用正则或 AST(抽象语法树)精准识别代码中的中文字符串(包括 Template、Script 和 JSX 部分)。

•全量扫描(full-project-scan工具): 文件过多的时候,全量扫描会有问题。可以通过指定文件夹的方式,扫描该文件夹下面的文件。

•增量扫描(git-change工具):针对git变更的文件,进行扫描。精准定位变更文件,仅处理本次变更涉及的代码,大幅提升效率。

•智能去重: 对提取出的文本进行去重,确保相同的中文文案(如“确认”、“取消”)只生成一个 Key,避免冗余。

第二步:AI 辅助翻译与文件生成

•翻译缓存: 优先查询 数据存储层 中的 Translation Cache,已翻译过的文案直接复用,显著降低 Token 消耗并加速流程。

•自动化翻译: 提取的中文列表没有在缓存中或zh文件中的,被发送给 LLM,自动翻译成英文。

•语义化 Key 生成: 区别于传统 Hash 值,LLM 根据代码上下文(Context)自动生成符合语义的 Key(如将“请输入密码”生成为 pleaseInputPassword),提升代码可读性。

•文件落地: 自动在 lang 文件夹下生成标准的 zh.tsen.ts 文件。



生成示例: zh.ts: { "pleaseSelect": "请选择" } en.ts: { "pleaseSelect": "Please Select" }





第三步:一键代码替换

•变更预览 (Preview): 在实际修改前,可调用 preview-changes 工具展示即将变更的代码对比,确保修改符合预期。

•AST 节点替换: 使用 extract-and-replace 工具,将源代码中的硬编码字符串精准替换为国际化方法(如 $t('pleaseSelect'))。

•无损格式保持: 基于 AST 的替换策略能够完美保留原代码的缩进、换行和注释,修改后的代码无需二次 Lint 即可直接提交。





3. 成果与收益:从“数周”到“数小时”

通过引入 JoyCode + i18n-mcp 的实践,我在项目的国际化改造中取得了显著的成效:

📊 定量收益

维度传统人工方式JoyCode + i18n-mcp提升幅度
单页面改造耗时约 10-30 分钟< 1 分钟效率提升 90%+
词条遗漏率质量显著提升
变量命名耗时需人工构思AI 秒级生成完全自动化

💡 定性收益

1.解放生产力: 从枯燥的“搬运工”工作中解脱出来,可以专注于业务逻辑和核心功能的开发。

2.代码规范统一: AI 生成的 Key 风格高度统一(全驼峰),避免了“千人千面”的命名混乱。

3.可维护性增强: 建立了自动化的语言包管理机制,后续新增词条只需运行脚本即可。



4. i18n-mcp开发

i18n-mcp是我首次开发MCP,整体难度相对较低。对于前端部分,基于github模板进行开发,随后发布至公司NPM私服即可。

核心代码主要由JoyCode的编码功能协助完成。按照上述核心流程步骤通过问答交互的方式,引导JoyCode完成核心代码的开发工作。

整个i18n-mcp架构图如下所示(架构图亦由JoyCode生成)。

在这里插入图片描述



MCP配置如下

{
  "mcpServers": {
    "i18n-mcp": {
      "autoApprove": [],
      "disabled": true,
      "timeout": 180,
      "command": "npx",
      "type": "stdio",
      "transportType": "stdio",
      "args": [
        "-y",
        "@jd/i18n-mcp@latest"
      ],
      "env": {}
    }
  }
}

效果

配置之后,输入prompt “调用i18n-mcp的auto-i18n-process方法”

效果如下:

在这里插入图片描述

5. 总结

尽管目前 i18n-mcp 仍存在一些不足,例如在全面扫描大量文件时可能出现连接错误、翻译和替换结果不够准确等问题,仍需人工进行二次校验,但其在短时间内辅助开发的价值依然显著。在本次实践过程中,我主要通过 JoyCode 的交互式问答完成开发工作。JoyCode 不仅在代码补全方面发挥了重要作用,更凭借其强大的智能调度和自动化执行能力,成为高效处理复杂任务的核心中枢。结合 i18n-mcp 的开发,AI技术的深度赋能得以充分体现,大幅提升了开发的效率。

后续,我将持续研究 AI 在前端开发中的落地场景,充分发挥 AI 辅助开发的强大能力。通过深入探索和应用 AI 技术,进一步释放其在业务创新与效率提升方面的巨大潜力。

前言

最近刚结束了一个HVV蓝队,比较头疼,甲方内部设备简直太多了,目前各个大厂都是这么玩儿的,本地MSS不好好适配不同厂家的产品,弹起来适配的问题最好的借口就是需要开发介入调整适配,不单单是在适配不同厂商的防火墙上扯皮还是在适配不同厂商探针上同样扯皮,有这扯皮功夫还不如直接写个工具一键封禁得了。其实就目前防守状态来讲,通过告警事件联动不同区域的防火墙这种技术手段太简单了,本地MSS在态感的基础上优化剧本再加上GPT介入研判已经基本上可以解决绝大部分的告警攻击了,可能目前唯一的问题可能就是出现在厂商态感底层告警逻辑上,目前探针获取的流量也只能获取非SSL流量,不做SSL卸载或者SSL证书解密的话一部分告警是无法获取的,另外常见的横向行为和隐藏流量也只能基于厂商设备底层逻辑或者剧本编排。

这次的工具功能是封禁共享情报的ip,或者是基于不同边界的不同品牌的防火墙和WAF的封禁。

下载地址:

功能介绍

  • 支持多种防火墙设备统一管理(H3C,QAX,SXF,山石,K01,DP等)
  • 网防K01提供黑名单批量查询、添加、删除功能
  • 支持 IP 规则化、过滤、清洗等功能
  • 界面简洁,操作便捷,可以选择性的根据不同的情报源头选择不同边界的设备进行封禁
  • 关于产品型号可支持大部分型号,除网防K01外其它产品封禁实现原理是基于添加地址到地址簿,封禁策略需手动处理

工具介绍和代码逻辑

配置文件

配置文件存放在config/config.json中,需要提前配置json文件,负责无法使用程序。这里关于config的文件内容务必按照格式规则设置,否则无法解析。

DP防火墙

DP防火墙的登录方式是telnet,逻辑是通过添加ip地址进入地址簿,添加ip地址

package devices

import (
	"BT_supertoolsV2/utils"
	"fmt"
	"strings"
	"time"

	"github.com/reiver/go-telnet"
)

type DPConfig struct {
	Name         string `json:"name"`
	DeviceIP     string `json:"device_ip"`
	TelnetPort   int    `json:"telnet_port"`
	Username     string `json:"username"`
	Password     string `json:"password"`
	AddressGroup string `json:"address_group"`
	Type         string `json:"type"`
}

func AddIPsToDP(cfg DPConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 DP 设备 %s ===\n", cfg.DeviceIP))

	// IP 规则化处理
	normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
	if normalizedIPs == "" {
		return "没有有效的 IP 地址!"
	}
	ipList := strings.Split(normalizedIPs, "\n")

	// 连接 Telnet
	address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
	conn, err := telnet.DialTo(address)
	if err != nil {
		return fmt.Sprintf("Telnet 连接失败: %v\n", err)
	}
	defer conn.Close()

	// 封装发送命令函数
	send := func(cmd string) {
		conn.Write([]byte(cmd + "\n"))
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
	}

	// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
	send(cfg.Username)
	send(cfg.Password)
	send("conf")
	for _, ip := range ipList {
		send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
	}
	send("exit")

	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

这里没有直接使用回显显示状态,没有监听服务器回显状态再输入命令,具体使用该方式是新增一个地址簿,再策略位置设置封禁策略,引用添加的地址簿即可。界面效果

H3c防火墙

采用ssh登录使用命令操作

package devices

import (
	"fmt"
	"strings"
	"time"

	"golang.org/x/crypto/ssh"
)

func AddIPsToH3C(cfg H3CConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 H3C 设备 %s ===\n\n", cfg.DeviceIP))

	config := &ssh.ClientConfig{
		User: cfg.Username,
		Auth: []ssh.AuthMethod{
			ssh.Password(cfg.Password),
		},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
		Timeout:         10 * time.Second,
	}

	client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
	if err != nil {
		return fmt.Sprintf("SSH连接失败: %v\n", err)
	}
	defer client.Close()

	session, err := client.NewSession()
	if err != nil {
		return fmt.Sprintf("创建会话失败: %v\n", err)
	}
	defer session.Close()

	stdin, err := session.StdinPipe()
	if err != nil {
		return fmt.Sprintf("获取输入管道失败: %v\n", err)
	}

	modes := ssh.TerminalModes{
		ssh.ECHO:          0,
		ssh.TTY_OP_ISPEED: 14400,
		ssh.TTY_OP_OSPEED: 14400,
	}
	if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
		return fmt.Sprintf("设置终端失败: %v\n", err)
	}
	if err := session.Shell(); err != nil {
		return fmt.Sprintf("启动 shell 失败: %v\n", err)
	}

	commands := []string{
		"system-view",
		fmt.Sprintf("object-group ip  %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network host address %s", ip))
	}
	commands = append(commands, "exit")

	for _, cmd := range commands {
		fmt.Fprintf(stdin, "%s\n", cmd)
		time.Sleep(300 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
	}
	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

这里采用了监听回显,核心代码是

commands := []string{
		"system-view",
		fmt.Sprintf("object-group ip  %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network host address %s", ip))
	}
	commands = append(commands, "exit")

当然这里可以增加优化,代码内关于ip地址的添加以及删除等操作都可以做,工具后期可以依托命令功能增加多个模块化设计,当然为了降低操作风险的话,这里黑名单封禁的操作足够使用了。

山石防火墙

山石登录方式是telnet登录使用命令操作

import (
	"BT_supertoolsV2/utils"
	"fmt"
	"strings"
	"time"

	"github.com/reiver/go-telnet"
)

type HSConfig struct {
	Name         string `json:"name"`
	DeviceIP     string `json:"device_ip"`
	TelnetPort   int    `json:"telnet_port"`
	Username     string `json:"username"`
	Password     string `json:"password"`
	AddressGroup string `json:"address_group"`
	Type         string `json:"type"`
}

func AddIPsToHillstone(cfg HSConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置 Hillstone 设备 %s ===\n", cfg.DeviceIP))

	// IP 规则化处理
	normalizedIPs := utils.NormalizeIP(strings.Join(ips, "\n"))
	if normalizedIPs == "" {
		return "没有有效的 IP 地址!"
	}
	ipList := strings.Split(normalizedIPs, "\n")

	// 连接 Telnet
	address := fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.TelnetPort)
	conn, err := telnet.DialTo(address)
	if err != nil {
		return fmt.Sprintf("Telnet 连接失败: %v\n", err)
	}
	defer conn.Close()

	// 封装发送命令函数
	send := func(cmd string) {
		conn.Write([]byte(cmd + "\n"))
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[发送] %s\n", cmd))
	}

	// 开始发送指令:用户名 -> 密码 -> conf -> 封禁命令 -> exit
	send(cfg.Username)
	send(cfg.Password)
	send("conf")
	for _, ip := range ipList {
		send(fmt.Sprintf("address-object %s %s/32", cfg.AddressGroup, ip))
	}
	send("exit")

	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

山石防火墙的命令基本和华三防火墙的命令基本一致,目前基本上可以支持大多数版本型号的防火墙,使用前可以做测试。

网盾K01

网盾K01的话这里采用的是关于api的利用,目前关于网盾K01的黑名单添加的模式是和防火墙不一致的,可以采用接口的方式增加。目前代码中关于黑名单添加的事件设置的是3600小时。

// 批量封禁
func AddBlacklist(devices []K01Device, ips []string, output *widget.Entry) {
	for _, device := range devices {
		appendOutput(output, fmt.Sprintf("🔑 正在登录设备 [https://%s]...", device.IP))

		// 登录并获取 Token
		url := fmt.Sprintf("https://%s", device.IP)
		token := Login(url, device.Username, device.Password, output, device.Name)
		if token == "" {
			appendOutput(output, fmt.Sprintf("❌ 登录失败: %s", device.Name))
			continue
		}

		// 逐个 IP 添加到黑名单
		for _, ip := range ips {
			// 如果 IP 地址没有 CIDR 后缀,添加 "/32"
			if !strings.Contains(ip, "/") {
				ip += "/32"
			}

			// 打印调试日志,确认每个 IP 地址
			appendOutput(output, fmt.Sprintf("🔍 封禁 IP: %s", ip))

			// 准备请求 payload
			payload := map[string]interface{}{
				"color":       0,
				"device_mask": []int{224},
				"items": []map[string]interface{}{
					{
						"type":        0,
						"ip":          ip, // 这里单独封禁每个 IP 地址
						"timeout":     336,
						"time_type":   "3600",
						"device_mask": []int{224},
						"comment":     "自动封禁",
					},
				},
				"method": "add",
			}

			// 打印调试日志,确认请求 Payload
			payloadBytes, _ := json.Marshal(payload)
			// appendOutput(output, fmt.Sprintf("🔍 请求 Payload: %s", string(payloadBytes)))

			// 请求封禁
			req, err := http.NewRequest("POST", url+"/api/v1/security/iplist/save", bytes.NewBuffer(payloadBytes))
			if err != nil {
				appendOutput(output, fmt.Sprintf("❌ 创建封禁请求失败: %s", err.Error()))
				continue
			}
			req.Header.Set("Authorization", "Bearer "+token)
			req.Header.Set("Content-Type", "application/json")

			// 执行请求
			resp, err := insecureClient.Do(req)
			if err != nil {
				appendOutput(output, fmt.Sprintf("❌ 封禁请求失败: %s", err.Error()))
				continue
			}
			defer resp.Body.Close()

			// // 打印调试日志,确认响应代码
			// appendOutput(output, fmt.Sprintf("🔍 响应状态: %s", resp.Status))

			// 解析响应
			body, _ := ioutil.ReadAll(resp.Body)
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				appendOutput(output, fmt.Sprintf("❌ 解析封禁响应失败: %s", err.Error()))
				continue
			}

			// // 打印调试日志,确认响应结果
			// appendOutput(output, fmt.Sprintf("🔍 响应结果: %v", result))

			// 根据结果显示成功或失败
			if success, ok := result["success"].(bool); ok && success {
				appendOutput(output, fmt.Sprintf("✅ 添加成功: %s", ip))
			} else {
				appendOutput(output, fmt.Sprintf("❌ 添加失败: %v", result["msg"]))
			}
		}
	}
}

// DeleteBlacklist 删除 IP 从黑名单
func DeleteBlacklist(devices []K01Device, selected []int, ipList []string, outputBox *widget.Entry) {
	for _, idx := range selected {
		device := devices[idx] // 获取当前选择的设备
		url := fmt.Sprintf("https://%s", device.IP)

		// 输出正在登录设备信息
		appendOutput(outputBox, fmt.Sprintf("🔄 正在登录设备【%s】...", device.Name))

		// 登录设备,获取 token
		token := Login(url, device.Username, device.Password, outputBox, device.Name)
		if token == "" {
			appendOutput(outputBox, fmt.Sprintf("❌ 设备【%s】登录失败,无法删除黑名单", device.Name))
			continue
		}

		// 遍历 IP 列表,直接进行删除操作
		for _, ip := range ipList {
			apiUrl := fmt.Sprintf("%s/api/v1/security/iplist/save", url)
			payload := fmt.Sprintf(`{
				"color": 0,
				"items": [{"id": "%s/32;0;0", "type": 0, "ip": "%s/32", "device_mask": [224]}],
				"method": "delete"
			}`, ip, ip)

			// 创建请求
			req, err := http.NewRequest("POST", apiUrl, bytes.NewBuffer([]byte(payload)))
			if err != nil {
				appendOutput(outputBox, "❌ 创建删除请求失败: "+err.Error())
				continue
			}
			req.Header.Set("Authorization", "Bearer "+token)
			req.Header.Set("Content-Type", "application/json")

			// 发送请求
			resp, err := insecureClient.Do(req)
			if err != nil {
				appendOutput(outputBox, "❌ 删除请求失败: "+err.Error())
				continue
			}
			defer resp.Body.Close()

			// 读取响应
			body, err := ioutil.ReadAll(resp.Body)
			if err != nil {
				appendOutput(outputBox, "❌ 读取删除响应失败: "+err.Error())
				continue
			}

			// 解析响应
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				appendOutput(outputBox, "❌ 解析删除响应失败: "+err.Error())
				continue
			}

			// 检查删除是否成功
			if success, ok := result["success"].(bool); ok && success {
				appendOutput(outputBox, fmt.Sprintf("✅ 删除成功,IP: %s", ip))
			} else {
				// 如果失败,输出错误信息
				if msg, ok := result["msg"].(string); ok {
					appendOutput(outputBox, fmt.Sprintf("❌ 删除失败: %s", msg))
				} else {
					appendOutput(outputBox, "❌ 删除失败,未知错误")
				}
			}
		}
	}

关于api的调用的话是有其自己的参数规则的,因为认证方式是https,所以关于身份认证的话需要忽略ssl证书。因为考虑到删除函数的逻辑需先查询要封禁的黑名单是否在黑名单列表,所以这里就执行的是直接删除,否则就删除失败,但是对于功能性的查询模块的功能是有的,由于网盾k01的产品特性,一点发现全网阻断,所以这里基本上用不到删除黑名单的功能。

QAX网神防火墙

网神联动利用的是ssh登录执行命令,所以这里权限也相对来说比较大,对于蓝队来讲不建议增加其它代码功能,原理也是利用策略引用地址簿进行封禁

func AddIPsToQAX(cfg QAXConfig, ips []string) string {
	var output strings.Builder
	output.WriteString(fmt.Sprintf("=== 开始配置奇安信设备 %s ===\n\n", cfg.DeviceIP))

	config := &ssh.ClientConfig{
		User: cfg.Username,
		Auth: []ssh.AuthMethod{
			ssh.Password(cfg.Password),
		},
		HostKeyCallback: ssh.InsecureIgnoreHostKey(),
		Timeout:         15 * time.Second,
	}

	client, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", cfg.DeviceIP, cfg.SSHPort), config)
	if err != nil {
		return fmt.Sprintf("SSH连接失败: %v\n", err)
	}
	defer client.Close()

	session, err := client.NewSession()
	if err != nil {
		return fmt.Sprintf("创建会话失败: %v\n", err)
	}
	defer session.Close()

	stdin, err := session.StdinPipe()
	if err != nil {
		return fmt.Sprintf("获取输入管道失败: %v\n", err)
	}

	modes := ssh.TerminalModes{
		ssh.ECHO:          0,
		ssh.TTY_OP_ISPEED: 14400,
		ssh.TTY_OP_OSPEED: 14400,
	}
	if err := session.RequestPty("xterm", 80, 40, modes); err != nil {
		return fmt.Sprintf("设置终端失败: %v\n", err)
	}
	if err := session.Shell(); err != nil {
		return fmt.Sprintf("启动shell失败: %v\n", err)
	}

	commands := []string{
		"config terminal",
		fmt.Sprintf("object address %s", cfg.AddressGroup),
	}
	for _, ip := range ips {
		commands = append(commands, fmt.Sprintf("network %s 32", ip))
	}
	commands = append(commands, "exit")

	for _, cmd := range commands {
		fmt.Fprintf(stdin, "%s\n", cmd)
		time.Sleep(500 * time.Millisecond)
		output.WriteString(fmt.Sprintf("[执行] %s\n", cmd))
	}
	output.WriteString("\n=== 配置完成 ===\n")
	return output.String()
}

向地址簿中添加ip地址核心代码

commands := []string{
	"config terminal",
	fmt.Sprintf("object address %s", cfg.AddressGroup),
}
for _, ip := range ips {
	commands = append(commands, fmt.Sprintf("network %s 32", ip))
}
commands = append(commands, "exit")

目前我也是查询了多款网神防火墙的手册,关于地址簿添加这块儿的命令版本是没有变化的,基本上支持大多数版本型号。

SXF_AF和WAF

这里SXF的设备是有两种模式可供选择的,但是目前SXF的ssh登录一般是由两段密码组成还有可能经常性的修改,所以这里使用的API的方式向地址簿中添加要封禁的IP地址,但是这里的话,策略务必要配置正确。

// 新增的结构体,用于表示包含 devices 字段的 JSON 数据结构
type DeviceList struct {
	Devices []SXFDevice `json:"devices"`
}

// 登录响应结构体
type loginResponse struct {
	Code int `json:"code"`
	Data struct {
		LoginResult struct {
			Token string `json:"token"`
		} `json:"loginResult"`
	} `json:"data"`
	Message string `json:"message"`
}

// 添加 IP 请求结构体
type ipRange struct {
	Start string `json:"start"`
}

type addIPRequest struct {
	IPRanges []ipRange `json:"ipRanges"`
}

// ✅ 防冲突:明确为 SXF 的登录函数
// 登录获取 token
func SXFLogin(device SXFDevice) (string, error) {
	// 打印设备信息,确保 IP、用户名和密码正确
	fmt.Printf("设备信息: IP=%s, Username=%s, Password=%s\n", device.IP, device.Username, device.Password)
	url := fmt.Sprintf("https://%s/api/v1/namespaces/public/login", device.IP)

	payload := map[string]string{
		"name":     device.Username,
		"password": device.Password,
	}
	jsonData, _ := json.Marshal(payload)

	client := &http.Client{
		Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}},
	}
	resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonData))
	if err != nil {
		return "", fmt.Errorf("登录请求失败: %v", err)
	}
	defer resp.Body.Close()

	body, _ := io.ReadAll(resp.Body)

	// 打印响应内容进行调试
	fmt.Println("响应体内容:", string(body))

	var result struct {
		Code    int    `json:"code"`
		Message string `json:"message"`
		Data    struct {
			LoginResult struct {
				Token string `json:"token"`
			} `json:"loginResult"`
		} `json:"data"`
	}

	if err := json.Unmarshal(body, &result); err != nil {
		return "", fmt.Errorf("解析登录响应失败: %v", err)
	}

	// 检查返回的 code
	if result.Code != 0 {
		return "", fmt.Errorf("登录失败: %s", result.Message)
	}

	// 返回 token
	return result.Data.LoginResult.Token, nil
}

// 加载 JSON 文件并解析为设备切片
// 加载 JSON 文件并解析为设备切片
func loadSXFDevices(filePath string) ([]SXFDevice, error) {
	data, err := os.ReadFile(filePath)
	if err != nil {
		return nil, fmt.Errorf("设备配置文件加载失败: %v", err)
	}

	// 输出加载的 JSON 数据(用于调试)
	fmt.Printf("加载的 JSON 数据: %s\n", string(data))

	var devices []SXFDevice
	err = json.Unmarshal(data, &devices)
	if err != nil {
		return nil, fmt.Errorf("设备解析失败: %v", err)
	}

	// 输出解析后的设备信息(调试)
	fmt.Printf("加载的设备数量: %d\n", len(devices))
	for _, device := range devices {
		// 输出每台设备的详细信息,特别是 AddressGroup 字段
		fmt.Printf("设备信息:Name=%s, IP=%s, AddressGroup=%s\n", device.Name, device.IP, device.AddressGroup)
	}

	return devices, nil
}

// ✅ 添加多个 IP 到 SXF 地址组(支持日志回调)
// 在 AddToSXFBlacklist 函数内部,确保请求头正确设置
// 设备添加到黑名单的函数
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
// 添加多个 IP 到 SXF 地址组(支持日志回调)
// AddToSXFBlacklist 将 IP 地址添加到 SXF 防火墙的黑名单
func AddToSXFBlacklist(devices []SXFDevice, ips []string, logFn func(string)) error {

	for _, device := range devices {
		// 1. 登录
		token, err := SXFLogin(device)
		if err != nil {
			logFn(fmt.Sprintf("【%s】❌ 登录失败: %v", device.Name, err))
			continue
		}
		logFn(fmt.Sprintf("【%s】✅ 登录成功", device.Name))

		// 2. 添加每个IP
		for _, ip := range ips {
			ipRanges := []map[string]string{{"start": ip, "end": ip}}
			requestData := map[string]interface{}{"ipRanges": ipRanges}
			requestDataJSON, _ := json.Marshal(requestData)

			// 3. 发送请求
			url := fmt.Sprintf(
				"https://%s/api/v1/namespaces/public/ipgroups/%s?_arrayop=add",
				device.IP,
				device.AddressGroup,
			)
			req, _ := http.NewRequest("PATCH", url, bytes.NewBuffer(requestDataJSON))
			req.Header = http.Header{
				"token":        []string{token},
				"Content-Type": []string{"application/json"},
			}

			client := &http.Client{
				Transport: &http.Transport{
					TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
				},
			}
			resp, err := client.Do(req)
			if err != nil {
				logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %v", device.Name, ip, err))
				continue
			}
			defer resp.Body.Close()

			// 4. 处理响应
			body, _ := io.ReadAll(resp.Body)
			var result map[string]interface{}
			if err := json.Unmarshal(body, &result); err != nil {
				logFn(fmt.Sprintf("【%s】❌ IP %s 响应解析失败: %v", device.Name, ip, err))
				continue
			}

			if code, ok := result["code"].(float64); ok && code == 0 {
				logFn(fmt.Sprintf("【%s】✅ IP %s 已添加到地址组[%s]",
					device.Name, ip, device.AddressGroup))
			} else {
				msg, _ := result["message"].(string)
				logFn(fmt.Sprintf("【%s】❌ IP %s 添加失败: %s", device.Name, ip, msg))
			}
		}
	}
	return nil
}

如果想要扩展功能的话不建议使用API去扩展,毕竟太麻烦了,各种参数比较麻烦,没有ssh的方式登录使用命令操作简便。

整体界面效果

总结

其它厂商的防火墙确实没找到手册,有的是找到手册没有测试的设备,目前上述的设备和代码是完全没有问题的,如果想添加其它厂商的设备,可以给我操作手册,我这边更新新版本发布。每次更新后的授权时间是3个月

Gemini Business 自动注册 & 2API 上传工具

基于大佬们的开源成果,整合了定时注册自动上传过期剔除等功能。实现全自动化的账号池维护。

核心逻辑优化

  • 自动维护:定时注册并直接传到 2API,自动剔除已过期账号,保留可用账号。
  • 失败重试:修改了注册机逻辑,设定申请 N 个,即使中间失败,也会一直重试直到成功申请到 N 个为止。
  • 丰俭由人:建议每 11 小时注册 2-3 个即可满足个人使用。避免对随机邮箱大佬提供不必要的压力。


感谢各位大佬的无私奉献:


环境准备

请确保已安装 Python 环境,并安装以下依赖库:

pip install undetected-chromedriver selenium beautifulsoup4 requests pystray pillow

配置说明

1. 先去 hf 部署一个 2api, 记住 API 地址admin_key
2. 新建并复制代码生成 py 文件,右键编辑,在顶部的配置区域填入你的 2API 信息:

 # 服务器 API 配置 API_HOST = "请输入你的服务器API地址" ADMIN_KEY = "请输入你的管理员密钥" # 无头模式开关 (True=后台运行无窗口, False=显示浏览器窗口) HEADLESS_MODE = True ##无头注册率会低点,但胜在静默,结合重试其实体验更好。 

运行脚本

在命令行中执行:

py gemini_auto.py

代码如下:

gemini_auto.py
"""
Gemini Business 自动注册上传工具
"""

# 标准库
import sys
import json
import time
import random
from pathlib import Path
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse, parse_qs
from concurrent.futures import ThreadPoolExecutor

# 第三方库
import requests
from bs4 import BeautifulSoup
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC


# ==================== 配置区域 ====================
# 服务器 API 配置
API_HOST = "请输入你的服务器API地址"
ADMIN_KEY = "请输入你的管理员密钥"

# 临时邮箱 API 配置
MAIL_API = "https://mail.chatgpt.org.uk"
MAIL_KEY = "gpt-test"

# Gemini 登录页面
LOGIN_URL = "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZS1hY2JkLThmOGY2ZDE0ODM1Mg"

# 本地账号文件
ACCOUNTS_FILE = "accounts.json"

# 页面元素定位
XPATH = {
    "email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input",
    "continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button",
    "verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button",
}

# 随机姓名池
NAMES = [
    "James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones",
    "David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez",
    "Elizabeth Taylor", "Richard Moore", "Susan Wilson", "Joseph Anderson", "Jessica Thomas",
    "Charles Jackson", "Sarah White", "Christopher Harris", "Karen Martin", "Daniel Thompson",
    "Thomas Garcia", "Nancy Martinez", "Matthew Robinson", "Lisa Clark", "Anthony Lewis",
    "Betty Walker", "Mark Young", "Margaret Allen", "Donald King", "Sandra Wright"
]

# 全局停止标志 (用于 GUI 停止任务)
STOP_FLAG = False

# 无头模式开关 (True=后台运行无窗口, False=显示浏览器窗口)
HEADLESS_MODE = True
# ==================================================


# ==================== 工具函数 ====================
def print_log(msg, level="INFO"):
    """统一日志输出格式"""
    icons = {"INFO": "→", "WARN": "⚠", "ERROR": "✗", "OK": "✓"}
    icon = icons.get(level, "•")
    print(f"{icon} {msg}")


def print_separator(char="=", length=80):
    """打印分隔线"""
    print(char * length)


def print_progress(current, total, success, fail, avg_time):
    """打印进度信息"""
    print(f"\n>>> 进度: {current}/{total} | 成功: {success} | 失败: {fail} | 平均耗时: {avg_time:.1f}s")


def log_error(email, error_msg):
    """记录错误到日志文件"""
    error_file = Path("errors.log")
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] 邮箱: {email} | 错误: {error_msg}\n"
    
    try:
        with open(error_file, "a", encoding="utf-8") as f:
            f.write(log_entry)
        print_log(f"错误已记录到 errors.log", "INFO")
    except Exception as e:
        print_log(f"写入错误日志失败: {e}", "WARN")


# ==================== 邮箱管理 ====================
email_queue = []


def create_temp_email():
    """创建临时邮箱地址"""
    try:
        response = requests.get(
            f"{MAIL_API}/api/generate-email",
            headers={"X-API-Key": MAIL_KEY},
            timeout=30
        )
        if response.status_code == 200 and response.json().get('success'):
            email = response.json()['data']['email']
            return email
    except Exception as e:
        print_log(f"邮箱服务异常: {e}", "❌")
    return None


def prefetch_email():
    """预创建邮箱并加入队列"""
    email = create_temp_email()
    if email:
        email_queue.append(email)


def get_email():
    """获取邮箱地址(优先使用队列中的)"""
    if email_queue:
        email = email_queue.pop(0)
        print_log(f"邮箱就绪 → {email}")
        return email
    
    email = create_temp_email()
    if email:
        print_log(f"已生成 → {email}")
    return email


def fetch_verification_code(email, timeout=60):
    """获取邮箱验证码"""
    print_log("等待邮件验证码...")
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        try:
            response = requests.get(
                f"{MAIL_API}/api/emails",
                params={"email": email},
                headers={"X-API-Key": MAIL_KEY},
                timeout=10
            )
            
            if response.status_code == 200:
                emails = response.json().get('data', {}).get('emails', [])
                if emails:
                    html_content = emails[0].get('html_content') or emails[0].get('content', '')
                    soup = BeautifulSoup(html_content, 'html.parser')
                    code_element = soup.find('span', class_='verification-code')
                    
                    if code_element:
                        code = code_element.get_text().strip()
                        if len(code) == 6:
                            print_log(f"验证码 → {code}", "OK")
                            return code
        except:
            pass
        
        elapsed = int(time.time() - start_time)
        print(f"  等待中... ({elapsed}s)", end='\r')
        time.sleep(2)
    
    print_log("验证码超时,请检查网络", "ERROR")
    return None


# ==================== 账号注册 ====================
def save_account_config(email, driver, timeout=10):
    """提取并保存账号配置信息"""
    print_log(f"提取账号配置中(最多 {timeout}s)...")
    start_time = time.time()
    account_data = None

    while time.time() - start_time < timeout:
        cookies = driver.get_cookies()
        current_url = driver.current_url
        parsed_url = urlparse(current_url)

        # 提取 config_id
        url_parts = current_url.split('/')
        config_id = None
        for i, part in enumerate(url_parts):
            if part == 'cid' and i + 1 < len(url_parts):
                config_id = url_parts[i + 1].split('?')[0]
                break

        # 提取关键 cookies
        cookie_map = {c['name']: c for c in cookies}
        session_cookie = cookie_map.get('__Secure-C_SES', {})
        host_cookie = cookie_map.get('__Host-C_OSES', {})

        # 提取 csesidx
        csesidx = parse_qs(parsed_url.query).get('csesidx', [None])[0]

        # 验证所有必需字段
        if all([
            session_cookie.get('value'),
            host_cookie.get('value'),
            csesidx,
            config_id
        ]):
            expiry_timestamp = session_cookie.get('expiry', 0) - 43200
            expires_at = datetime.fromtimestamp(expiry_timestamp).strftime('%Y-%m-%d %H:%M:%S') if expiry_timestamp > 0 else None
            
            account_data = {
                "id": email,
                "csesidx": csesidx,
                "config_id": config_id,
                "secure_c_ses": session_cookie.get('value'),
                "host_c_oses": host_cookie.get('value'),
                "expires_at": expires_at
            }
            
            elapsed = time.time() - start_time
            print_log(f"配置提取完成 ({elapsed:.1f}s)", "OK")
            break

        time.sleep(1)

    if not account_data:
        print_log(f"配置不完整,已跳过 → {email}", "WARN")
        return None

    # 保存到文件
    existing_accounts = []
    if Path(ACCOUNTS_FILE).exists():
        try:
            with open(ACCOUNTS_FILE, 'r', encoding='utf-8') as f:
                existing_accounts = json.load(f)
        except:
            pass
    
    existing_accounts.append(account_data)
    
    with open(ACCOUNTS_FILE, 'w', encoding='utf-8') as f:
        json.dump(existing_accounts, f, indent=2, ensure_ascii=False)
    
    print_log(f"已保存 → {ACCOUNTS_FILE}", "OK")
    return account_data


def fast_type(element, text, delay=0.02):
    """快速输入文本"""
    for c in text:
        element.send_keys(c)
        time.sleep(delay)


def register_single_account(driver, executor):
    """注册单个账号 (来自 app.py 的简洁版本)"""
    start_time = time.time()
    email = get_email()
    if not email:
        return None, False, None, 0

    wait = WebDriverWait(driver, 30)

    try:
        # 1. 访问登录页
        driver.get(LOGIN_URL)
        
        # 检测空白页
        time.sleep(2)
        page_source = driver.page_source
        if len(page_source) < 500 or "about:blank" in driver.current_url:
            raise Exception("页面加载空白,需要重启浏览器")

        # 2. 输入邮箱
        print_log("输入邮箱...")
        inp = wait.until(EC.element_to_be_clickable((By.XPATH, XPATH["email_input"])))
        inp.click()
        inp.clear()
        fast_type(inp, email)
        
        # 验证邮箱是否成功输入
        time.sleep(0.3)
        actual_value = inp.get_attribute("value")
        if actual_value != email:
            print_log(f"输入验证失败,清空后重新输入...", "WARN")
            # 清空后用 JS 输入
            driver.execute_script("arguments[0].value = '';", inp)
            time.sleep(0.1)
            driver.execute_script("arguments[0].value = arguments[1];", inp, email)
            # 触发 input 事件
            driver.execute_script("""
                var event = new Event('input', { bubbles: true });
                arguments[0].dispatchEvent(event);
            """, inp)
            time.sleep(0.3)
        
        print_log(f"邮箱 → {email}", "OK")

        # 3. 点击继续
        time.sleep(0.5)
        btn = wait.until(EC.element_to_be_clickable((By.XPATH, XPATH["continue_btn"])))
        driver.execute_script("arguments[0].click();", btn)
        print_log("继续下一步", "OK")

        # 异步预创建下一个邮箱
        executor.submit(prefetch_email)

        # 4. 获取验证码
        time.sleep(2)
        code = fetch_verification_code(email)
        if not code:
            return email, False, None, time.time() - start_time

        # 5. 输入验证码
        time.sleep(1)
        print_log(f"输入验证码 → {code}")
        try:
            pin = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pinInput']")))
            pin.click()
            time.sleep(0.1)
            fast_type(pin, code, 0.05)
        except:
            try:
                span = driver.find_element(By.CSS_SELECTOR, "span[data-index='0']")
                span.click()
                time.sleep(0.2)
                driver.switch_to.active_element.send_keys(code)
            except Exception as e:
                print_log(f"验证码输入失败: {e}", "ERROR")
                return email, False, None, time.time() - start_time

        # 6. 点击验证
        time.sleep(0.5)
        try:
            vbtn = driver.find_element(By.XPATH, XPATH["verify_btn"])
            driver.execute_script("arguments[0].click();", vbtn)
        except:
            for btn in driver.find_elements(By.TAG_NAME, "button"):
                if '验证' in btn.text:
                    driver.execute_script("arguments[0].click();", btn)
                    break
        print_log("提交验证", "OK")

        # 7. 输入姓名
        print_log("等待姓名输入...")
        selectors = [
            "input[formcontrolname='fullName']",
            "input[placeholder='全名']",
            "input[placeholder='Full name']",
            "input#mat-input-0",
        ]
        name_inp = None

        # 轮询检测姓名输入框
        for _ in range(30):
            for sel in selectors:
                try:
                    name_inp = driver.find_element(By.CSS_SELECTOR, sel)
                    if name_inp.is_displayed():
                        break
                except:
                    continue
            if name_inp and name_inp.is_displayed():
                break
            time.sleep(1)

        if name_inp and name_inp.is_displayed():
            name = random.choice(NAMES)
            name_inp.click()
            time.sleep(0.2)
            name_inp.clear()
            fast_type(name_inp, name)
            print_log(f"姓名 → {name}", "OK")
            time.sleep(0.3)
            name_inp.send_keys(Keys.ENTER)
            time.sleep(1)
        else:
            print_log("未找到姓名输入框", "ERROR")
            return email, False, None, time.time() - start_time

        # 8. 等待进入工作台
        print_log("等待工作台...")
        for _ in range(30):
            time.sleep(1)
            url = driver.current_url
            if 'business.gemini.google' in url and '/cid/' in url:
                print_log("工作台加载完成", "OK")
                break
        else:
            print_log(f"未跳转到工作台 → {driver.current_url}", "WARN")

        # 9. 保存配置
        elapsed = time.time() - start_time
        config = save_account_config(email, driver)
        if config:
            print_log(f"注册成功 → {email} (耗时 {elapsed:.1f}s)", "OK")
            return email, True, config, elapsed
        return email, False, None, elapsed

    except Exception as e:
        print_log(f"注册异常: {e}", "ERROR")
        log_error(email, str(e))
        return email, False, None, time.time() - start_time


# ==================== 账号上传 ====================
class AccountUploader:
    """账号上传管理类"""
    
    def __init__(self, api_host, admin_key):
        self.api_host = api_host.rstrip('/')
        self.admin_key = admin_key
        self.session = requests.Session()
        
    def login(self):
        """登录到服务器"""
        print_log("连接服务器中...")
        login_url = f"{self.api_host}/login"
        
        try:
            response = self.session.post(
                login_url,
                data={"admin_key": self.admin_key},
                allow_redirects=True,
                timeout=30
            )
            
            if len(self.session.cookies) > 0:
                print_log("服务器连接成功", "OK")
                return True
            
            if response.status_code == 200 and '登录' in response.text:
                print_log("密钥验证失败", "ERROR")
                return False
            
            print_log("服务器连接失败", "ERROR")
            return False
                
        except Exception as e:
            print_log(f"连接异常: {e}", "ERROR")
            return False
    
    def upload_and_replace(self, file_path):
        """覆盖上传账号配置"""
        if not Path(file_path).exists():
            print_log(f"文件不存在 → {file_path}", "ERROR")
            return False
        
        print_log(f"读取本地文件 → {file_path}")
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                accounts_data = json.load(f)
        except Exception as e:
            print_log(f"文件读取异常: {e}", "ERROR")
            return False
        
        print_log(f"本地账号 → {len(accounts_data)} 个")
        print_log("开始上传...")
        
        upload_url = f"{self.api_host}/accounts-config"
        
        try:
            response = self.session.put(
                upload_url,
                json=accounts_data,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                print_log("上传完成!", "OK")
                print_log(f"{result.get('message', '配置已更新')}")
                print_log(f"服务器账号 → {result.get('account_count', len(accounts_data))} 个")
                
                print()
                print_separator()
                print_log("正在获取服务器账号状态...")
                print_separator()
                self.view_accounts()
                
                return True
            else:
                print_log(f"上传失败,状态码: {response.status_code}", "ERROR")
                return False
                
        except Exception as e:
            print_log(f"上传异常: {e}", "ERROR")
            return False
    
    def upload_and_merge(self, file_path):
        """合并上传账号配置(保留远程正常账号)"""
        print_log("智能合并模式启动...")
        
        # 读取本地账号
        if not Path(file_path).exists():
            print_log(f"本地文件缺失 → {file_path}", "ERROR")
            return False
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                local_accounts = json.load(f)
            print_log(f"本地账号 → {len(local_accounts)} 个")
        except Exception as e:
            print_log(f"读取本地文件失败: {e}", "ERROR")
            return False
        
        # 获取远程账号配置
        print_log("获取远程配置...")
        config_url = f"{self.api_host}/accounts-config"
        
        try:
            response = self.session.get(config_url, timeout=30)
            if response.status_code == 200:
                remote_config = response.json()
                remote_accounts = remote_config.get('accounts', [])
                print_log(f"远程账号 → {len(remote_accounts)} 个")
            else:
                print_log("远程配置获取失败,仅上传本地", "WARN")
                remote_accounts = []
        except Exception as e:
            print_log(f"远程连接异常: {e},仅上传本地", "WARN")
            remote_accounts = []
        
        # 筛选远程正常账号(未过期、未禁用)
        valid_remote_accounts = []
        for account in remote_accounts:
            if account.get('disabled', False):
                continue
            
            expires_at = account.get('expires_at')
            if expires_at and expires_at != '未设置':
                try:
                    beijing_tz = timezone(timedelta(hours=8))
                    expire_time = datetime.strptime(expires_at, "%Y-%m-%d %H:%M:%S")
                    expire_time = expire_time.replace(tzinfo=beijing_tz)
                    current_time = datetime.now(beijing_tz)
                    if expire_time <= current_time:
                        continue
                except:
                    pass
            
            valid_remote_accounts.append(account)
        
        print_log(f"有效远程账号 → {len(valid_remote_accounts)} 个")
        
        # 合并账号(去重)
        merged_accounts = list(valid_remote_accounts)
        remote_ids = {acc.get('id') for acc in valid_remote_accounts}
        
        new_count = 0
        for local_account in local_accounts:
            local_id = local_account.get('id')
            if local_id not in remote_ids:
                merged_accounts.append(local_account)
                new_count += 1
        
        print_log(f"合并结果 → 保留 {len(valid_remote_accounts)} 个,新增 {new_count} 个,共 {len(merged_accounts)} 个")
        
        # 上传合并后的配置
        print_log("上传合并配置...")
        upload_url = f"{self.api_host}/accounts-config"
        
        try:
            response = self.session.put(
                upload_url,
                json=merged_accounts,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                print_log("合并上传完成!", "OK")
                print_log(f"{result.get('message', '配置已更新')}")
                print_log(f"服务器账号 → {result.get('account_count', len(merged_accounts))} 个")
                
                print()
                print_separator()
                print_log("正在获取服务器账号状态...")
                print_separator()
                self.view_accounts()
                
                return True
            else:
                print_log(f"上传失败,状态码: {response.status_code}", "ERROR")
                return False
                
        except Exception as e:
            print_log(f"上传异常: {e}", "ERROR")
            return False
    
    def view_accounts(self):
        """查看远程账号状态"""
        print_log("查询远程账号...")
        
        view_url = f"{self.api_host}/accounts"
        
        try:
            response = self.session.get(view_url, timeout=30)
            
            if response.status_code == 200:
                data = response.json()
                accounts = data.get('accounts', [])
                total = data.get('total', len(accounts))
                
                if not accounts:
                    print_log("远程无账号配置", "INFO")
                    return True
                
                print(f"\n共 {total} 个账号")
                print_separator("=", 120)
                
                # 表头
                print(f"{'序号':<6} {'账号ID':<35} {'状态':<12} {'过期时间':<22} {'剩余时长':<15} {'累计对话':<10}")
                print_separator("-", 120)
                
                # 账号列表
                for i, account in enumerate(accounts, 1):
                    acc_id = account.get('id', 'N/A')
                    status = account.get('status', 'N/A')
                    expires_at = account.get('expires_at', '未设置')
                    remaining = account.get('remaining_display', 'N/A')
                    conversations = account.get('conversation_count', 0)
                    
                    if len(acc_id) > 33:
                        acc_id = acc_id[:30] + "..."
                    
                    print(f"{i:<6} {acc_id:<35} {status:<12} {expires_at:<22} {remaining:<15} {conversations:<10}")
                
                print_separator("=", 120)
                return True
            else:
                print_log(f"查询失败 → 状态码 {response.status_code}", "ERROR")
                return False
                
        except Exception as e:
            print_log(f"查询异常: {e}", "ERROR")
            return False


# ==================== 主程序流程 ====================
def run_batch_registration(target_count):
    """批量注册账号 (保底成功数模式)"""
    print()
    print_separator()
    print(f"目标: 成功注册 {target_count} 个账号")
    print_separator()
    print()
    
    # 清空旧文件
    if Path(ACCOUNTS_FILE).exists():
        Path(ACCOUNTS_FILE).unlink()
        print_log(f"已清空 → {ACCOUNTS_FILE}")
    
    driver = None
    executor = ThreadPoolExecutor(max_workers=2)
    success_count = 0
    fail_count = 0
    attempt_count = 0
    total_time = 0
    success_times = []

    # 预创建第一个邮箱
    executor.submit(prefetch_email)
    
    # 连续失败计数器(用于保护机制)
    consecutive_fails = 0
    MAX_CONSECUTIVE_FAILS = 20

    # 循环直到成功数达到目标
    while success_count < target_count:
        # 检查全局停止标志
        global STOP_FLAG
        if STOP_FLAG:
            print_log("收到停止信号,中止任务", "WARN")
            STOP_FLAG = False  # 重置标志
            break
        
        # 连续失败保护
        if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
            print_log(f"连续失败 {MAX_CONSECUTIVE_FAILS} 次,中止本轮任务", "ERROR")
            break
        
        attempt_count += 1
        current_target = target_count + fail_count  # 动态调整显示的总数
        
        print()
        print_separator("#", 60)
        print(f"正在注册第 {attempt_count} 个账号 (成功: {success_count}/{target_count})")
        print_separator("#", 60)
        print()

        # 确保浏览器可用
        if driver is None:
            options = uc.ChromeOptions()
            if HEADLESS_MODE:
                print_log("启动无头浏览器...")
                options.add_argument("--headless=new")
                options.add_argument("--disable-gpu")
                options.add_argument("--no-sandbox")
                options.add_argument("--window-size=1200,800")
            else:
                print_log("启动浏览器...")
            driver = uc.Chrome(options=options, use_subprocess=True)
            if not HEADLESS_MODE:
                driver.set_window_size(100, 200)
                driver.set_window_position(50, 50)
            time.sleep(1)
        else:
            try:
                _ = driver.current_url
            except:
                print_log("浏览器已关闭,重启中...")
                try: 
                    driver.quit()
                except: 
                    pass
                options = uc.ChromeOptions()
                if HEADLESS_MODE:
                    options.add_argument("--headless=new")
                    options.add_argument("--disable-gpu")
                    options.add_argument("--no-sandbox")
                    options.add_argument("--window-size=1200,800")
                driver = uc.Chrome(options=options, use_subprocess=True)
                if not HEADLESS_MODE:
                    driver.set_window_size(100, 200)
                    driver.set_window_position(50, 50)
                time.sleep(1)

        try:
            email, success, config, elapsed = register_single_account(driver, executor)
            total_time += elapsed
            
            if success and config:
                success_count += 1
                success_times.append(elapsed)
                consecutive_fails = 0  # 重置连续失败计数
                print_log(f"进度: {success_count}/{target_count} 完成", "OK")
            else:
                fail_count += 1
                consecutive_fails += 1
                print_log(f"失败 +1 (连续失败: {consecutive_fails}/{MAX_CONSECUTIVE_FAILS})", "WARN")
                
        except Exception as e:
            error_msg = str(e).lower()
            print_log(f"注册异常: {e}", "ERROR")
            fail_count += 1
            consecutive_fails += 1
            
            # 检测空白页或页面加载问题
            if "blank" in error_msg or "timeout" in error_msg or "element" in error_msg:
                print_log("检测到页面异常,重启浏览器...", "WARN")
                if driver:
                    try: 
                        driver.quit()
                    except: 
                        pass
                    driver = None
            elif driver:
                try: 
                    driver.quit()
                except: 
                    pass
                driver = None

        avg_time = total_time / attempt_count if total_time > 0 else 0
        print_progress(success_count, target_count, success_count, fail_count, avg_time)

        if success_count < target_count and driver:
            try:
                driver.delete_all_cookies()
            except:
                pass
            time.sleep(random.randint(2, 3))

    executor.shutdown(wait=False)
    if driver:
        try: 
            driver.quit()
        except: 
            pass
        
        # Monkeypatch: 防止 __del__ 再次调用 quit 导致 WinError 6
        try:
            driver.quit = lambda: None
        except:
            pass
            
        driver = None

    # 统计信息
    avg_time = sum(success_times) / len(success_times) if success_times else 0
    min_time = min(success_times) if success_times else 0
    max_time = max(success_times) if success_times else 0
    
    print()
    print_separator()
    print(f"注册完成! 目标: {target_count}, 成功: {success_count}, 失败: {fail_count}, 总尝试: {attempt_count}")
    print(f"总耗时: {total_time:.1f}s | 平均: {avg_time:.1f}s | 最快: {min_time:.1f}s | 最慢: {max_time:.1f}s")
    print(f"账号已保存至: {ACCOUNTS_FILE}")
    print_separator()
    
    return {
        "success": success_count,
        "fail": fail_count,
        "attempts": attempt_count,
        "avg_time": avg_time,
        "success_times": success_times,
        "is_ok": success_count > 0
    }


def handle_task_execution(count, upload_mode, uploader):
    """执行一次完整的任务(注册+上传)"""
    stats = run_batch_registration(count)
    
    if stats.get('is_ok'):
        print()
        # 先登录再上传
        if not uploader.login():
            print_log("服务器登录失败,无法上传", "ERROR")
            return stats
        
        if upload_mode == 'replace':
            print_log("开始覆盖上传到服务器...")
            uploader.upload_and_replace(ACCOUNTS_FILE)
        elif upload_mode == 'merge':
            print_log("开始合并上传到服务器...")
            uploader.upload_and_merge(ACCOUNTS_FILE)
    else:
        print_log("注册流程未成功,取消上传", "WARN")
        
    return stats


def main():
    """主程序入口"""
    print_separator()
    print("Gemini Business 自动注册上传工具")
    print_separator()
    print()
    
    uploader = AccountUploader(API_HOST, ADMIN_KEY)
    
    # 登录服务器
    if not uploader.login():
        print_log("登录失败,无法继续", "ERROR")
        input("\n按回车键退出...")
        sys.exit(1)
    
    print()
    
    while True:
        print("\n请选择操作:")
        print("  1. 注册上传")
        print("  2. 查看远程账号状态")
        print("  3. 退出")
        print()
        
        choice = input("请输入选项 (1-3): ").strip()
        
        if choice == "1":
            # 确定上传模式
            upload_mode = 'merge'
            
            # 1. 询问数量
            count_str = input("\n请输入注册数量 (默认 5): ").strip()
            count = int(count_str) if count_str else 5
            
            # 2. 询问执行模式
            print("\n请选择执行模式:")
            print("  1. 立即执行一次")
            print("  2. 定时循环执行 (支持自定义间隔)")
            mode_choice = input("请输入选项 (1-2): ").strip()
            
            if mode_choice == "2":
                # 定时模式
                hours_str = input("\n请输入循环间隔小时 (默认 12): ").strip()
                try:
                    interval_hours = float(hours_str) if hours_str else 12.0
                except:
                    print_log("输入无效,使用默认值 12 小时", "WARN")
                    interval_hours = 12.0
                
                print(f"\n已选择: 定时循环模式 (间隔 {interval_hours} 小时)")
                run_now_str = input("是否在开始循环前立即运行一次? (y/n, 默认 y): ").strip().lower()
                run_now = run_now_str != 'n'
                
                print_log("定时任务已启动! 按 Ctrl+C 可随时停止", "INFO")
                
                loop_count = 0
                while True:
                    loop_count += 1
                    
                    if run_now or loop_count > 1:
                        print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] >>> 开始第 {loop_count} 次循环任务")
                        handle_task_execution(count, upload_mode, uploader)
                        print_log(f"第 {loop_count} 次任务完成", "INFO")
                    else:
                        print_log("跳过首次运行,直接进入等待", "INFO")

                    # 计算下次运行时间
                    next_run = datetime.now() + timedelta(hours=interval_hours)
                    print_log(f"下一次任务将在 {next_run.strftime('%Y-%m-%d %H:%M:%S')} 开始", "INFO")
                    
                    # 倒计时等待
                    total_seconds = int(interval_hours * 3600)
                    try:
                        while total_seconds > 0:
                            # 每分钟更新一次状态,显示剩余时间
                            if total_seconds % 60 == 0:
                                pass 
                            time.sleep(1)
                            total_seconds -= 1
                    except KeyboardInterrupt:
                        print("\n")
                        print_log("检测到中断, 停止定时任务", "WARN")
                        break
                        
            else:
                # 立即执行模式 (默认)
                print()
                handle_task_execution(count, upload_mode, uploader)
                
        elif choice == "2":
            print()
            uploader.view_accounts()
            
        elif choice == "3":
            print("\n再见!")
            break
            
        else:
            print_log("无效选项,请重试", "WARN")
        
        print()


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n")
        print_log("用户中断程序", "INFO")
    except Exception as e:
        print_log(f"程序异常: {e}", "ERROR")
        input("\n按回车键退出...")


📌 转载信息
原作者:
zding
转载时间:
2026/1/14 18:06:03

整理 | 华卫

 

“是时候在工作场景中突破 AI 的应用极限了。”在近期的一档播客节目中,素有 “SaaS 教父” 之称的 Jason Lemkin 表示,这意味着其销售部门将不再招聘人工员工。Lemkin 是全球最大的企业服务创业者社区 SaaStr 的创始人,曾向 B2B 初创公司投资超过 2 亿美元,如今他正领导 SaaStr 全面押注 AI Agent。

 

他透露,公司目前部署了 20 个 Agent,已承接原本需要 10 名销售开发代表和客户主管协作完成的工作。从全人工团队到 AI 主导的转型,SaaStr 的推进速度相当迅速。

两名高薪员工闪辞,创始人直接用 Agent 换掉大半人

今年 5 月时,SaaStr 仅有 1 个 Agent 投入实际运营,用于处理各类数字化任务。然而就在当月举办的 SaaStr 年度大会期间,公司两名高薪销售代表突然宣布离职。Lemkin 回忆道,他随即找到公司首席 AI 官 Amelia Lerutte,明确表示:“我们的销售部门再也不招人工了,要全力用 Agent 突破行业天花板。”

 

在他看来,与其花费 15 万美元年薪招聘一名最终可能离职的初级销售代表,不如启用忠诚度更高的 Agent,这笔人力成本投入完全得不偿失。据 Lerutte 透露,到 6 月时,SaaStr 已开始大幅扩充投入运营的 Agent 数量。她表示:“当时我们只和 Delphi 合作部署了 1 个非核心业务 Agent,直到 6 月初才着手将 Agent 数量从 2 个增至 20 多个。那两名员工离职后,我们审慎决定将部分(非全部)人力预算转而投入 Agent 的研发与部署。”

 

Lemkin 称,让他做出此决定的还有一个原因是,大会筹备期间,Delphi 这个既没受过销售训练、也没做过市场推广的通用 Agent,居然独立签下了一笔 7 万美元的赞助订单。

 

“可能有人会觉得我这么做不够 ‘酷’,但我实在不想再招聘第 28 个注定会离职的销售了。”Lemkin 坦言道。

 

他还提到,如今在 SaaStr 的办公区,原先归属市场拓展团队 10 名员工的工位,都已换上了 Agent 的专属名牌,比如负责客户资质审核的 “Quali”、主打定制化服务的 “Arty”,以及对接代码协作平台 Replit 的 “Repli”。

 

“我坐在办公室最里面,中间的工位全是 Agent。整个办公室安静得不得了。”

“Agent 生产力不输人工,但别自己动手做”

 

如今 SaaStr 的销售团队已发展成为一种全新的运营模式:20 个 Agent 是主力,由 1.2 名员工管理,完成之前由 10 名销售开发代表 (SDR) 和客户经理 (AE) 完成的工作。据介绍,其中

0.2 个人工指的是负责公司大小事的 Lerutte,她只需要花 20% 的工作时间,就能把所有 Agent 的调度和管理都搞定。

 

“用最优秀的员工、最完善的工作流程去训练 Agent,它就能逐步成长为你团队里顶尖销售的‘数字分身’。”Lemkin 指出,SaaStr 正以公司内部顶尖员工为标杆训练 Agent。 抛开安全隐患不谈,Agent 的综合生产力与人工员工基本持平,但优势在于效率更高、且能像软件一样实现规模化复制。

 

Lemkin 表示,现在大部分公司宁愿雇一个 60、70 岁的资深工程师,也不愿意花时间培养新人。毕竟这样做效率更高,这种趋势也在蔓延到销售行业。 只要是价格谈判空间不大,而且 Agent 对产品的了解程度超过人类的场景,顶尖的 Agent 完全能独立完成签单,至少在他们的业务场景里是完全可行的。懂得管理 Agent、与 Agent 协作,而且对自家产品了如指掌的人,价值会越来越高,其他的人则会慢慢被边缘化。

 

“我们的目标不是用 AI 取代所有人,核心逻辑是:AI 正在接管那些没人愿意干的活,淘汰那些业绩平平、能力平庸的岗位。而顶尖的销售人才,AI 会成为他们的‘超级外挂’。”

 

据 Lemkin 预测,靠邮件跟进的销售开发专员、人工筛选线索的岗位,不仅效率低、客户体验也很差,明年这些岗位基本都会消失。至于负责谈单的核心销售岗位,到明年年底还能保住 70% 的工作内容,但长远来看,这个比例会降到 40% 到 50%。

 

此外,他表示,SaaStr 所有的市场推广类 Agent 都不是自建的,全都是直接采购的,并建议大家也别自己动手。“这些工具理论上确实能自己开发,但真的没必要。这类产品的成本不算特别高,完全值得购买。而且 AI 领域的创新迭代太快了,就算你真的自己做出来,可能几个月后就过时了。”

 

Lemkin 坚称,2026 年,不管是哪类公司,都应该有足够的动力去探索 AI 在市场推广中的应用边界。现在所有公司的诉求都很明确:要么靠 AI 提升效率,要么靠 AI 缓解海量客户咨询的压力。那些没跟上 AI 浪潮的公司,增长已经严重放缓,不是 “所有方法都失灵了”,只是效果远不如 2021 年那样显著,投入产出比太低了,企业也没有足够预算去支撑 2021 年那套老派的 SaaS 推广打法。

被批“太激进”,半年前才被 Replit 坑过?

 

SaaStr 的这套 Agent 应用流程,与开发者云端平台 Vercel 的实践异曲同工,后者曾让一名顶尖销售员工全程记录工作流程,耗时六周以此为蓝本训练出一款销售 Agent,使其能够精准复刻这名员工的工作模式。

 

然而,有网友提出疑问:“SaaStr 为何要大张旗鼓地宣布这件事?在 B2B SaaS 行业,客户服务流程是企业运营的核心命脉之一。他如此高调地宣称 ‘再也不招人工员工’,是想达到什么目的?难道他觉得潜在客户会被这种说法吸引还是排斥?依我看,在绝大多数行业领域里,答案恐怕都是后者。”

 

还有人表示,“敢把 ‘让顶尖销售员工全程记录工作流程,耗时六周完成数据采集,再据此打造出能复刻其工作模式的销售 Agent’ 的细节公之于众,胆子可真不小。我敢肯定,这短短六周里,绝对不可能覆盖到所有可能出现的业务场景。还有什么是不可能出错的呢?想必你们自己也无从知晓,毕竟团队里已经没有任何具备一线实战经验的员工可以咨询了。通过技术手段简化工作流程、优化员工的时间分配有很大的操作空间,但这类激进的替代策略,无疑是一场蓄势待发的灾难。”

 

甚至有人开始预测道:“有没有人敢打赌,这出闹剧会在多久之后彻底失控?届时 Jason 和他的团队恐怕只能悄悄叫停 Agent 项目,想方设法掩盖痕迹,再去招揽那些走投无路、愿意回头为他们效力的员工。不如我们把赌注再抬高些:要是后续曝出不当行为引发的诉讼,或是被竞争对手设局诱导 Agent 泄露了所有专有数据,赌注就加倍。”

 

当前,众多企业都在试水 AI Agent 技术,但风险依然不容忽视,其中最突出的隐患便是数据泄露与网络犯罪的威胁。英国阿达・洛芙莱斯研究所高级研究员 Harry Farmer 近期在接受采访时表示:“AI Agent 若要充分发挥功能、顺利接入各类应用程序,往往需要获取其运行设备的操作系统层级权限。”而这种全方位的权限获取,会为网络犯罪分子制造更多潜在的攻击切入点。

 

值得一提的是,Lemkin 在六个月前因为以下事情被热议:他仅用了 9 天就通过 Replit 的新型 AI Agent 构建出一个 SaaS 产品,并将其称之为“价值 100 万美元的产品”。该产品重写了核心页面,改进了用户体验,并快速上线。之后他开启了代码冻结模式,结果 Agent 却无视了该指令,把整个生产环境数据库都删了个精光。

 

参考链接:

https://www.businessinsider.com/godfather-of-saas-jason-lemkin-replace-humans-ai-agents-sales-2026-1

https://www.youtube.com/watch?v=I-R1bc1rlFs

写了个小工具可以在后台完成 Discord Quests 中除了 Discord Activity 以外全部类型任务。视频任务通过发心跳包实现,游戏任务通过模拟游戏进程实现。

Discord Quests 搞了这么久,奖励类别一般有游戏内道具和 Orbs 两种。前者一般是游戏厂商来推广时送的一些游戏道具,后者可以换 Discord 内的账号装饰。


📌 转载信息
转载时间:
2026/1/7 19:10:39

// ==UserScript==
// @name         Grok/X.ai 自动化注册机 (集成自动清理与循环版)
// @namespace    http://tampermonkey.net/
// @version      5.0
// @description  全自动流程:注册 -> 提取Token -> 清理Cookie -> 循环重启
// @author       Bytebender
// @match        *://*/*
// @match        https://x.ai/*
// @match        https://www.x.ai/*
// @match        https://accounts.x.ai/*
// @match        https://grok.com/*
// @match        https://www.grok.com/*
// @grant        GM_setValue
// @grant        GM_getValue
// @grant        GM_registerMenuCommand
// @grant        GM_notification
// @grant        GM_xmlhttpRequest
// @grant        GM_setClipboard
// @grant        GM_cookie
// @connect      mail.chatgpt.org.uk
// @connect      100.64.0.101
// @connect      api.x.ai
// @connect      x.ai
// @connect      grok.com
// @connect      www.grok.com
// @connect      accounts.x.ai
// ==/UserScript==

(function() {
    'use strict';

    // ========================================================
    // 1. 随机数据生成工具
    // ========================================================

    // 生成随机姓名 (首字母大写)
    function getRandomName() {
        const chars = 'abcdefghijklmnopqrstuvwxyz';
        const len = Math.floor(Math.random() * 5) + 4; // 长度 4-8
        let result = '';
        for (let i = 0; i < len; i++) {
            result += chars.charAt(Math.floor(Math.random() * chars.length));
        }
        return result.charAt(0).toUpperCase() + result.slice(1);
    }

    // 生成强密码 (12位,包含大小写+数字+特殊符号)
    function getRandomPassword() {
        const lower = "abcdefghijklmnopqrstuvwxyz";
        const upper = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
        const nums = "0123456789";
        const symbols = "!@#$%^&*";

        // 1. 确保每种字符至少有一个
        let pass = "";
        pass += lower[Math.floor(Math.random() * lower.length)];
        pass += upper[Math.floor(Math.random() * upper.length)];
        pass += nums[Math.floor(Math.random() * nums.length)];
        pass += symbols[Math.floor(Math.random() * symbols.length)];

        // 2. 补足剩余长度
        const allChars = lower + upper + nums + symbols;
        for (let i = 0; i < 8; i++) {
            pass += allChars[Math.floor(Math.random() * allChars.length)];
        }

        // 3. 打乱顺序 (洗牌)
        return pass.split('').sort(() => 0.5 - Math.random()).join('');
    }

    // ========================================================
    // 2. 任务编排配置
    // ========================================================
    const actions = [
        // --- 阶段一:Grok 首页跳转 ---
        {
            "step_name": "1. 点击 Grok 首页入口",
            "type": "click",
            "selector": "html > body > div:nth-of-type(2) > div > div > div > main > div > div > div:nth-of-type(3) > div:nth-of-type(2) > a:nth-of-type(2)",
            "url_keyword": "grok.com"
        },
        // --- 阶段二:进入注册页 (X.ai) ---
        {
            "step_name": "2. 点击 X.ai 注册/登录按钮",
            "type": "click",
            "selector": "html > body > div:nth-of-type(2) > div > div > div:nth-of-type(2) > div > div:nth-of-type(2) > button",
            "url_keyword": "accounts.x.ai"
        },
        // --- 阶段三:自动化邮箱 ---
        {
            "step_name": "3. 自动申请并填写临时邮箱",
            "type": "get_email",
            "selector": "html > body > div:nth-of-type(2) > div > div > div:nth-of-type(2) > div > form > div > div > input",
        },
        {
            "step_name": "4. 点击下一步 (提交邮箱)",
            "type": "click",
            "selector": "html > body > div:nth-of-type(2) > div > div > div:nth-of-type(2) > div > form > div:nth-of-type(2) > button"
        },
        // --- 阶段四:验证码 ---
        {
            "step_name": "5. 等待邮件验证码并自动填写",
            "type": "fill_code",
            "selector": "input[name='code']"
        },
        // --- 阶段五:填写个人信息 (全随机) ---
        {
            "step_name": "6. 填写名 (First Name)",
            "type": "input",
            "selector": "input[name='givenName']",
            "value": "__RANDOM__"
        },
        {
            "step_name": "7. 填写姓 (Last Name)",
            "type": "input",
            "selector": "input[name='familyName']",
            "value": "__RANDOM__"
        },
        {
            "step_name": "8. 填写密码 (强密码)",
            "type": "input",
            "selector": "input[name='password']",
            "value": "__RANDOM_PASS__"
        },
        // --- 阶段六:提交 ---
        {
            "step_name": "9. 点击最终提交",
            "type": "click",
            "selector": "button[type='submit']"
        },
        // --- 阶段七:提取 Token ---
        {
            "step_name": "10. 检查跳转并上传 Token",
            "type": "wait_url_and_upload",
            "target_url": "grok.com"
        },
        // --- 阶段八:清理环境并循环 ---
        {
            "step_name": "11. 清理 Cookie 并重启循环",
            "type": "clean_and_restart",
            "clean_targets": [
                "https://x.ai/",
                "https://www.x.ai/",
                "https://accounts.x.ai/",
                "https://grok.com/",
                "https://www.grok.com/"
            ]
        }
    ];

    // ========================================================
    // 3. 核心功能类 (邮箱/网络/Cookie)
    // ========================================================

    // 3.1 网络请求封装
    function gmFetch(url, options) {
        return new Promise((resolve, reject) => {
            GM_xmlhttpRequest({
                url: url,
                method: options.method || 'GET',
                headers: options.headers || {},
                data: options.body || null,
                onload: (response) => {
                    if (response.status >= 200 && response.status < 300) {
                        try { resolve(JSON.parse(response.responseText)); }
                        catch (e) { resolve(response.responseText); }
                    } else { reject(new Error(`HTTP Error: ${response.status}`)); }
                },
                onerror: () => reject(new Error('Network Error')),
                ontimeout: () => reject(new Error('Timeout'))
            });
        });
    }

    // 3.2 临时邮箱客户端
    class TempMailClient {
        constructor() {
            this.baseUrl = "https://mail.chatgpt.org.uk/api";
            this.headers = {
                "User-Agent": "Mozilla/5.0",
                "Origin": "https://mail.chatgpt.org.uk",
                "Referer": "https://mail.chatgpt.org.uk/"
            };
        }
        async getEmail() {
            const result = await gmFetch(`${this.baseUrl}/generate-email`, {
                method: "GET",
                headers: { ...this.headers, "content-type": "application/json" }
            });
            if (result && result.success && result.data?.email) return result.data.email;
            throw new Error("邮箱API返回异常");
        }
        async fetchMessages(email) {
            const url = `${this.baseUrl}/emails?email=${encodeURIComponent(email)}`;
            const result = await gmFetch(url, {
                method: "GET",
                headers: { ...this.headers, "cache-control": "no-cache" }
            });
            return (result.success && result.data?.emails) ? result.data.emails : [];
        }
        async waitForCode(email, timeoutSec = 120) {
            console.log(`[Mail] 开始监听 ${email} ...`);
            const startTime = Date.now();
            const codeRegex = /\b[A-Z0-9]{3}-[A-Z0-9]{3}\b|\b\d{6}\b/;
            return new Promise((resolve, reject) => {
                const timer = setInterval(async () => {
                    if (Date.now() - startTime > timeoutSec * 1000) {
                        clearInterval(timer);
                        reject(new Error("等待验证码超时"));
                    }
                    try {
                        const msgs = await this.fetchMessages(email);
                        if (msgs.length > 0) {
                            for (const msg of msgs) {
                                const content = (msg.subject || "") + " " + (msg.html_content || "");
                                const match = content.match(codeRegex);
                                if (match) {
                                    clearInterval(timer);
                                    resolve(match[0]);
                                    return;
                                }
                            }
                        }
                    } catch(e) { console.warn("Polling error:", e); }
                }, 3000);
            });
        }
    }

    // 3.3 Token 上传逻辑
    async function extractAndUploadToken() {
        return new Promise((resolve, reject) => {
            GM_cookie.list({ name: "sso" }, (cookies, error) => {
                if (error || !cookies || cookies.length === 0) {
                    return reject(new Error("SSO Cookie missing"));
                }
                const ssoToken = cookies[0].value;
                console.log("获取到 Token:", ssoToken.substring(0, 10) + "...");

                GM_xmlhttpRequest({
                    url: "http://xxx/api/tokens/add",
                    method: "POST",
                    headers: {
                        "content-type": "application/json",
                        "authorization": "Bearer xxxxx"
                    },
                    data: JSON.stringify({ tokens: [ssoToken], token_type: "sso" }),
                    onload: (response) => {
                        if (response.status >= 200 && response.status < 300) {
                            console.log("Token 上传成功!");
                            resolve();
                        } else {
                            reject(new Error("Upload failed: " + response.responseText));
                        }
                    },
                    onerror: (err) => reject(err)
                });
            });
        });
    }

    // 3.4 Cookie 清理逻辑
    function executeCleanCookies(targetUrls) {
        return new Promise((resolve) => {
            if (!targetUrls || targetUrls.length === 0) return resolve();
            let completed = 0;
            targetUrls.forEach(url => {
                GM_cookie.list({ url: url }, function(cookies, error) {
                    if (cookies && cookies.length > 0) {
                        cookies.forEach(c => {
                            GM_cookie.delete({ name: c.name, url: url }, () => {});
                        });
                    }
                    completed++;
                    if (completed === targetUrls.length) {
                        setTimeout(resolve, 800); // 缓冲
                    }
                });
            });
        });
    }

    // 3.5 Native 输入模拟 (绕过 React/Vue 绑定)
    function setNativeValue(element, value) {
        const valueSetter = Object.getOwnPropertyDescriptor(element, 'value').set;
        const prototype = Object.getPrototypeOf(element);
        const prototypeValueSetter = Object.getOwnPropertyDescriptor(prototype, 'value').set;
        if (valueSetter && valueSetter !== prototypeValueSetter) {
            prototypeValueSetter.call(element, value);
        } else {
            valueSetter.call(element, value);
        }
        element.dispatchEvent(new Event('input', { bubbles: true }));
    }

    // 3.6 元素等待
    const waitForElement = (selector, timeout = 10000) => {
        return new Promise((resolve, reject) => {
            const el = document.querySelector(selector);
            if (el) return resolve(el);
            const observer = new MutationObserver(() => {
                const el = document.querySelector(selector);
                if (el) { observer.disconnect(); resolve(el); }
            });
            observer.observe(document.body, { childList: true, subtree: true });
            setTimeout(() => { observer.disconnect(); reject(new Error('元素超时: ' + selector)); }, timeout);
        });
    };

    // ========================================================
    // 4. 自动化执行引擎
    // ========================================================
    const mailClient = new TempMailClient();
    let isRunning = GM_getValue('script_is_running', false);
    let currentIndex = GM_getValue('script_step_index', 0);

    console.log(`🚀 [注册机状态] Running: ${isRunning} | Step: ${currentIndex}`);

    GM_registerMenuCommand(`▶️ 启动/继续`, () => {
        GM_setValue('script_is_running', true);
        isRunning = true;
        runCurrentStep();
    });

    GM_registerMenuCommand("🔄 强制重置", () => {
        GM_setValue('script_step_index', 0);
        GM_setValue('script_is_running', false);
        GM_setValue('current_temp_email', '');
        location.reload();
    });

    async function runCurrentStep() {
        if (!GM_getValue('script_is_running', false)) return;

        // 异常保护:索引越界重置
        if (currentIndex >= actions.length) {
            GM_setValue('script_step_index', 0);
            return location.reload();
        }

        const action = actions[currentIndex];
        console.log(`[Step ${currentIndex + 1}] ${action.step_name} (${action.type})`);

        // URL 检查 (如果不在目标域名,等待跳转)
        if (action.url_keyword && !location.href.includes(action.url_keyword)) {
            console.log(`等待跳转到 ${action.url_keyword}...`);
            return setTimeout(runCurrentStep, 2000);
        }

        try {
            await new Promise(r => setTimeout(r, 3000)); // 基础缓冲
            let el = null;
            if (action.selector) el = await waitForElement(action.selector);

            // --- 动作分发 ---
            if (action.type === 'get_email') {
                const email = await mailClient.getEmail();
                console.log("获取邮箱:", email);
                GM_setValue('current_temp_email', email);
                GM_setClipboard(email);

                el.click(); el.focus();
                setNativeValue(el, email);
                el.dispatchEvent(new Event('change', { bubbles: true }));
                el.blur();
            }
            else if (action.type === 'fill_code') {
                const email = GM_getValue('current_temp_email');
                const rawCode = await mailClient.waitForCode(email);
                const code = rawCode.replace(/-/g, ''); // 清洗连字符
                console.log('填入验证码:', code);

                el.scrollIntoView({block: "center"});
                el.click(); el.focus();
                const nativeSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, "value").set;
                nativeSetter.call(el, code);
                el.dispatchEvent(new Event('input', { bubbles: true }));
                el.dispatchEvent(new Event('change', { bubbles: true }));
                await new Promise(r => setTimeout(r, 500));
                el.blur();
            }
            else if (action.type === 'input') {
                el.focus();
                let val = action.value;

                // 处理随机变量
                if (val === '__RANDOM__') val = getRandomName();
                if (val === '__RANDOM_PASS__') val = getRandomPassword();

                console.log(`[Input] 填写值: ${val}`);
                setNativeValue(el, val);
                el.blur();
            }
            else if (action.type === 'wait_url_and_upload') {
                if (!location.href.includes(action.target_url)) {
                    return setTimeout(runCurrentStep, 1500); // URL不对,继续等待
                }

                // 尝试多次上传,防止 Cookie 未即时写入
                let retry = 0;
                while (retry < 5) {
                    try {
                        await extractAndUploadToken();
                        GM_notification({ text: 'Token 上传成功!准备清理...', title: '成功' });
                        break;
                    } catch (e) {
                        console.warn("Token提取失败,重试中...", e);
                        await new Promise(r => setTimeout(r, 2000));
                        retry++;
                    }
                }
                // 继续下一步
            }
            else if (action.type === 'clean_and_restart') {
                GM_notification({ text: '清理 Cookie 并重启循环...', title: '系统维护' });
                await executeCleanCookies(action.clean_targets);

                GM_setValue('script_step_index', 0);
                GM_setValue('current_temp_email', '');

                console.log(">>> 循环重置完成,3秒后刷新");
                setTimeout(() => {
                    window.location.href = "https://grok.com/";
                }, 3000);
                return; // 结束本次执行栈
            }
            else if (action.type === 'click') {
                el.click();
            }

            // --- 步进逻辑 ---
            currentIndex++;
            GM_setValue('script_step_index', currentIndex);
            setTimeout(runCurrentStep, 1500);

        } catch (e) {
            console.error("执行出错:", e);
            // 遇到严重错误可以考虑刷新页面重试
            // setTimeout(() => location.reload(), 5000);
        }
    }

    // 启动检测
    function tryStart() {
        if (isRunning) {
            setTimeout(runCurrentStep, 1500);
        }
    }

    if (document.readyState === 'complete' || document.readyState === 'interactive') {
        tryStart();
    } else {
        window.addEventListener('load', tryStart);
    }

})();

修改 extractAndUploadToken 填入自己的 Grok2api 地址和凭证

开启无痕窗口打开 Grok,接着启动脚本即可

会自动生成邮箱、填写注册信息和验证码

IP 不好 CF 验证不会自动过,需要手动继续和点击下一步

IP 好就全自动了啦

循环注册 w