标签 selenium 下的文章

接:雨云无限白嫖 FRP 服务器攻略(无 aff)

@fatekey 大佬的方案是单独开一个容器,我想着反正我有青龙面板在跑脚本,站在巨人的肩膀上顺手写了一个,把多账户也写进来了

一、运行效果

先看实际运行的效果吧
【前两个账号运行已签到,仅测试多账户切换功能,临时注册的第三个号用于测试签到功能】

## 开始执行... 2026-01-23 16:57:53

2026-01-23 16:57:54.116950139 [W:onnxruntime:Default, cpuid_info.cc:91 LogEarlyWarning] Unknown CPU vendor. cpuinfo_vendor value: 16
2026-01-23 16:57:54,191 - INFO - --------------------------------------------------------------------------------
2026-01-23 16:57:54,191 - INFO - 雨云签到工具 by SerendipityR ~
2026-01-23 16:57:54,191 - INFO - Github发布页: https://github.com/SerendipityR-2022/Rainyun-Qiandao
2026-01-23 16:57:54,192 - INFO - --------------------------------------------------------------------------------
2026-01-23 16:57:54,192 - INFO - 雨云签到工具容器版 by fatekey ~
2026-01-23 16:57:54,192 - INFO - Github发布页: https://github.com/fatekey/Rainyun-Qiandao
2026-01-23 16:57:54,192 - INFO - --------------------------------------------------------------------------------
2026-01-23 16:57:54,192 - INFO -                    项目为二次开发青龙脚本化运行
2026-01-23 16:57:54,192 - INFO -                      本项目基于上述项目开发
2026-01-23 16:57:54,192 - INFO -                 本项目仅作为学习参考,请勿用于其他用途
2026-01-23 16:57:54,192 - INFO - --------------------------------------------------------------------------------
2026-01-23 16:57:54,192 - INFO - ✅ 成功解析3个账号
2026-01-23 16:57:54,192 - INFO - 
================= 处理第1个账号 ==================
2026-01-23 16:57:54,192 - INFO - 
========== 开始处理账号:TACGN ==========
2026-01-23 16:57:54,192 - INFO - ⏳ 随机延时 0 分钟 35 秒
2026-01-23 16:58:29,794 - INFO - ✅ Selenium驱动初始化成功,路径:/usr/bin/chromedriver
2026-01-23 16:58:29,809 - INFO - ✅ 已注入stealth.min.js反检测脚本
2026-01-23 16:58:29,809 - INFO - ⏳ 发起登录请求
2026-01-23 16:58:29,810 - INFO - 🌐 访问雨云登录页
2026-01-23 16:58:31,258 - INFO - 页面标题:登录 | 雨云
2026-01-23 16:58:31,258 - INFO - ⏳ 等待登录表单元素加载...
2026-01-23 16:58:31,376 - INFO - 📝 输入账号密码
2026-01-23 16:58:31,775 - INFO - ⏳ 正在登录中,耗时较长请稍等……
2026-01-23 16:58:54,899 - INFO - ✅ 未触发登录验证码
2026-01-23 16:58:59,906 - INFO - 当前页面: https://app.rainyun.com/dashboard
2026-01-23 16:58:59,910 - INFO - 页面标题: 总览 | 雨云
2026-01-23 16:58:59,940 - INFO - ✅ 账号登录成功:TACGN
2026-01-23 16:58:59,940 - INFO - 🌐 访问赚取积分页
2026-01-23 16:59:00,958 - INFO - 当前页面: https://app.rainyun.com/account/reward/earn
2026-01-23 16:59:00,962 - INFO - 页面标题: 赚取积分 | 雨云
2026-01-23 16:59:00,962 - INFO - 🔍 查找每日签到按钮
2026-01-23 16:59:01,016 - INFO - 📌 签到状态:已完成,无需重复签到
2026-01-23 16:59:01,047 - INFO - 💰 当前积分:700(约0.35元)
2026-01-23 16:59:01,131 - INFO - ✅ 账号TACGN浏览器已关闭
2026-01-23 16:59:01,131 - INFO - ✅ 临时文件清理完成
2026-01-23 16:59:01,131 - INFO - 
========== 账号TACGN处理完成 ==========

2026-01-23 16:59:05,996 - INFO - 
================= 处理第2个账号 ==================
2026-01-23 16:59:05,996 - INFO - 
========== 开始处理账号:ACGN_T ==========
2026-01-23 16:59:05,996 - INFO - ⏳ 随机延时 4 分钟 3 秒
2026-01-23 17:03:09,549 - INFO - ✅ Selenium驱动初始化成功,路径:/usr/bin/chromedriver
2026-01-23 17:03:09,564 - INFO - ✅ 已注入stealth.min.js反检测脚本
2026-01-23 17:03:09,564 - INFO - ⏳ 发起登录请求
2026-01-23 17:03:09,564 - INFO - 🌐 访问雨云登录页
2026-01-23 17:03:11,015 - INFO - 页面标题:登录 | 雨云
2026-01-23 17:03:11,016 - INFO - ⏳ 等待登录表单元素加载...
2026-01-23 17:03:11,155 - INFO - 📝 输入账号密码
2026-01-23 17:03:11,584 - INFO - ⏳ 正在登录中,耗时较长请稍等……
2026-01-23 17:03:34,692 - INFO - ✅ 未触发登录验证码
2026-01-23 17:03:39,699 - INFO - 当前页面: https://app.rainyun.com/dashboard
2026-01-23 17:03:39,703 - INFO - 页面标题: 总览 | 雨云
2026-01-23 17:03:39,733 - INFO - ✅ 账号登录成功:ACGN_T
2026-01-23 17:03:39,733 - INFO - 🌐 访问赚取积分页
2026-01-23 17:03:40,777 - INFO - 当前页面: https://app.rainyun.com/account/reward/earn
2026-01-23 17:03:40,782 - INFO - 页面标题: 赚取积分 | 雨云
2026-01-23 17:03:40,783 - INFO - 🔍 查找每日签到按钮
2026-01-23 17:03:40,858 - INFO - 📌 签到状态:已完成,无需重复签到
2026-01-23 17:03:40,881 - INFO - 💰 当前积分:4684(约2.34元)
2026-01-23 17:03:40,966 - INFO - ✅ 账号ACGN_T浏览器已关闭
2026-01-23 17:03:40,966 - INFO - ✅ 临时文件清理完成
2026-01-23 17:03:40,966 - INFO - 
========== 账号ACGN_T处理完成 ==========

2026-01-23 17:03:43,883 - INFO - 
================= 处理第3个账号 ==================
2026-01-23 17:03:43,883 - INFO - 
========== 开始处理账号:ACGN ==========
2026-01-23 17:03:43,883 - INFO - ⏳ 随机延时 4 分钟 50 秒
2026-01-23 17:08:34,684 - INFO - ✅ Selenium驱动初始化成功,路径:/usr/bin/chromedriver
2026-01-23 17:08:34,698 - INFO - ✅ 已注入stealth.min.js反检测脚本
2026-01-23 17:08:34,699 - INFO - ⏳ 发起登录请求
2026-01-23 17:08:34,699 - INFO - 🌐 访问雨云登录页
2026-01-23 17:08:36,143 - INFO - 页面标题:登录 | 雨云
2026-01-23 17:08:36,143 - INFO - ⏳ 等待登录表单元素加载...
2026-01-23 17:08:36,285 - INFO - 📝 输入账号密码
2026-01-23 17:08:36,657 - INFO - ⏳ 正在登录中,耗时较长请稍等……
2026-01-23 17:08:59,802 - INFO - ✅ 未触发登录验证码
2026-01-23 17:09:04,809 - INFO - 当前页面: https://app.rainyun.com/dashboard
2026-01-23 17:09:04,813 - INFO - 页面标题: 总览 | 雨云
2026-01-23 17:09:04,842 - INFO - ✅ 账号登录成功:ACGN
2026-01-23 17:09:04,842 - INFO - 🌐 访问赚取积分页
2026-01-23 17:09:05,841 - INFO - 当前页面: https://app.rainyun.com/account/reward/earn
2026-01-23 17:09:05,871 - INFO - 页面标题: 赚取积分 | 雨云
2026-01-23 17:09:05,871 - INFO - 🔍 查找每日签到按钮
2026-01-23 17:09:05,940 - INFO - 📌 签到状态:领取奖励,开始领取
2026-01-23 17:09:06,072 - INFO - ⚠️ 触发签到验证码
2026-01-23 17:09:06,253 - INFO - 🔄 验证码处理第1次尝试(最大10次)
2026-01-23 17:09:06,810 - INFO - 开始下载验证码图片(1):https://turing.captcha.qcloud.com/cap_union_new_getcapbysig?img_index=1&image=02680900003d283800000015123b75d53fed&sess=s0HtD0kpY6pcWGI6FzFpt6ZiszTr2EhH-VfdHwPwxIdqv34Z-7I44K0-_RhKCQ_D1pczn56AhHTy7TzWXqVjayAnecALMlUWYf152tXUBM_URxYIPDxvoDD7jXbk7mwSIeJKDAUtmTTnuuaRcoqdw3DlBpEXv3Xc4RbCewuRGJZUAkZPrzzB8njktvXIOPrqAhs4UafKm96GgAUPJExW9_2PkkRGBKSTS43H1uLzB9el3g70xLMDYSd2TywoxM5Ps2idtfMPBn_aMw93gVXYLGwpX0Iztn4QG1vFv9VJj6NgCvOU2YSfCmTrGyEXxdzPnGglGJAKJFB0FfuxP4bM3-0O4DQt4l-5NsT52KR_8WcG7rvohxQXZy1sRw9MY84c31oFqKfUyPsa49v1VdtmranaOtiaDLX6SjgI6rJPvt2_kSelSHRNUWtA**
2026-01-23 17:09:07,077 - INFO - 开始下载验证码图片(2):https://turing.captcha.qcloud.com/cap_union_new_getcapbysig?img_index=0&image=02680900003d283800000015123b75d53fed&sess=s0HtD0kpY6pcWGI6FzFpt6ZiszTr2EhH-VfdHwPwxIdqv34Z-7I44K0-_RhKCQ_D1pczn56AhHTy7TzWXqVjayAnecALMlUWYf152tXUBM_URxYIPDxvoDD7jXbk7mwSIeJKDAUtmTTnuuaRcoqdw3DlBpEXv3Xc4RbCewuRGJZUAkZPrzzB8njktvXIOPrqAhs4UafKm96GgAUPJExW9_2PkkRGBKSTS43H1uLzB9el3g70xLMDYSd2TywoxM5Ps2idtfMPBn_aMw93gVXYLGwpX0Iztn4QG1vFv9VJj6NgCvOU2YSfCmTrGyEXxdzPnGglGJAKJFB0FfuxP4bM3-0O4DQt4l-5NsT52KR_8WcG7rvohxQXZy1sRw9MY84c31oFqKfUyPsa49v1VdtmranaOtiaDLX6SjgI6rJPvt2_kSelSHRNUWtA**
2026-01-23 17:09:07,375 - ERROR - ⚠️ 图案2识别率0.0000低于阈值0.4
2026-01-23 17:09:07,376 - ERROR - ❌ 验证码坐标重复,答案无效
2026-01-23 17:09:07,376 - ERROR - ❌ 验证码处理失败:验证码答案无效
2026-01-23 17:09:07,376 - ERROR - ⏳ 刷新验证码中,稍后重试……
2026-01-23 17:09:13,495 - INFO - 🔄 验证码处理第2次尝试(最大10次)
2026-01-23 17:09:13,518 - INFO - 开始下载验证码图片(1):https://turing.captcha.qcloud.com/cap_union_new_getcapbysig?img_index=1&image=0268090000946c2b0000000bb5e61fd63312&sess=s0_hrS7I5bVMRdivCWNVX_5xijZd5qBztok8b_H7bwMciiNFNIe3KMmj4IPktJO-cbs-8dl7upCI40ZosuxWWRjpXlIbF-P3ZWNoFjjg5G9dMFSybpTUgmgQO1lGEy1QSjGIghi44ITJTpCGcF4ym8wD4iU0xLCVakXfJvTvPiJbxl055LMVFM8W1FM1TtThPXpkg5h9JgYXRHols_wYhIgOI_dRxdgl3r_h-dSKI109RxypesTYee-w0m-Lw_41AM1etin4G_Iamp3lveRUaOtNV1JT4ssYxJ3DR1NZ8SEfN3yxvn9Z-_dxfifqGBxz8hkBmv4vsmx4M9imY60mxrr32HJt0K1ODVgIkzXKA0mgcq1DsSXM0AlcE765_pI_-NP9BgOPXEivjsEDpnxrS-nUFA1DJEz6urpWBwjgZN80OGgAAIs1XL1A**
2026-01-23 17:09:13,731 - INFO - 开始下载验证码图片(2):https://turing.captcha.qcloud.com/cap_union_new_getcapbysig?img_index=0&image=0268090000946c2b0000000bb5e61fd63312&sess=s0_hrS7I5bVMRdivCWNVX_5xijZd5qBztok8b_H7bwMciiNFNIe3KMmj4IPktJO-cbs-8dl7upCI40ZosuxWWRjpXlIbF-P3ZWNoFjjg5G9dMFSybpTUgmgQO1lGEy1QSjGIghi44ITJTpCGcF4ym8wD4iU0xLCVakXfJvTvPiJbxl055LMVFM8W1FM1TtThPXpkg5h9JgYXRHols_wYhIgOI_dRxdgl3r_h-dSKI109RxypesTYee-w0m-Lw_41AM1etin4G_Iamp3lveRUaOtNV1JT4ssYxJ3DR1NZ8SEfN3yxvn9Z-_dxfifqGBxz8hkBmv4vsmx4M9imY60mxrr32HJt0K1ODVgIkzXKA0mgcq1DsSXM0AlcE765_pI_-NP9BgOPXEivjsEDpnxrS-nUFA1DJEz6urpWBwjgZN80OGgAAIs1XL1A**
2026-01-23 17:09:14,002 - ERROR - ⚠️ 图案2识别率0.1515低于阈值0.4
2026-01-23 17:09:14,003 - ERROR - ❌ 验证码坐标重复,答案无效
2026-01-23 17:09:14,003 - ERROR - ❌ 验证码处理失败:验证码答案无效
2026-01-23 17:09:14,003 - ERROR - ⏳ 刷新验证码中,稍后重试……
2026-01-23 17:09:23,108 - INFO - 🔄 验证码处理第3次尝试(最大10次)
2026-01-23 17:09:23,131 - INFO - 开始下载验证码图片(1):https://turing.captcha.qcloud.com/cap_union_new_getcapbysig?img_index=1&image=0268090000f13d2300000015123b75d53f2f&sess=s02qPcN6ye2H2TPQfQ9ghy_0L3jB722YFRMCmx-rWnjm4UgxUo3F4WLoUzz5JczVgNJMtQwRWLFRo4OvXls1zjaajvPXch4RMoo6YZOavScFvdGaB-9B-ecxWvfcPx7ZTEb03-5MTmG-P2LipAwhLGAYKO0JOK6Rb6z3KYkAy9pxHIXYP9FaLlwdRvLsEDbqKWJZKCP4IHJ9mav4XH2EoTFfWGYMR-sA53gKcavXkSbzg2J_3ntSL6rszaLREZi9ZCSn1bPIDt16NYUXhHhlPFCJmBzIh41fG-nFTtpB-A8_i_vPaNo3mwlxJ9KojhSP37q7CfeASWq8-DTtI3OnT-mZbyVzoDHvBgQOiiQu5o0_VxQtxzWD9vNmVbErvVsP1VxQEVv0GCFywapI0H-R2DimaJI87vvVkVIzVce2MZQJ_lxWICubZ-RA**
2026-01-23 17:09:23,352 - INFO - 开始下载验证码图片(2):https://turing.captcha.qcloud.com/cap_union_new_getcapbysig?img_index=0&image=0268090000f13d2300000015123b75d53f2f&sess=s02qPcN6ye2H2TPQfQ9ghy_0L3jB722YFRMCmx-rWnjm4UgxUo3F4WLoUzz5JczVgNJMtQwRWLFRo4OvXls1zjaajvPXch4RMoo6YZOavScFvdGaB-9B-ecxWvfcPx7ZTEb03-5MTmG-P2LipAwhLGAYKO0JOK6Rb6z3KYkAy9pxHIXYP9FaLlwdRvLsEDbqKWJZKCP4IHJ9mav4XH2EoTFfWGYMR-sA53gKcavXkSbzg2J_3ntSL6rszaLREZi9ZCSn1bPIDt16NYUXhHhlPFCJmBzIh41fG-nFTtpB-A8_i_vPaNo3mwlxJ9KojhSP37q7CfeASWq8-DTtI3OnT-mZbyVzoDHvBgQOiiQu5o0_VxQtxzWD9vNmVbErvVsP1VxQEVv0GCFywapI0H-R2DimaJI87vvVkVIzVce2MZQJ_lxWICubZ-RA**
2026-01-23 17:09:23,677 - INFO - 🎯 图案 1 坐标(37,277),匹配率:0.6552
2026-01-23 17:09:23,677 - INFO - 🎯 图案 2 坐标(598,114),匹配率:0.5641
2026-01-23 17:09:23,677 - INFO - 🎯 图案 3 坐标(437,197),匹配率:0.6207
2026-01-23 17:09:26,517 - INFO - 📤 提交验证码
2026-01-23 17:09:31,670 - INFO - ✅ 验证码验证通过
2026-01-23 17:09:36,676 - INFO - ✅ 签到奖励领取成功
2026-01-23 17:09:36,699 - INFO - 💰 当前积分:700(约0.35元)
2026-01-23 17:09:36,788 - INFO - ✅ 账号ACGN浏览器已关闭
2026-01-23 17:09:36,788 - INFO - ✅ 临时文件清理完成
2026-01-23 17:09:36,788 - INFO - 
========== 账号ACGN处理完成 ==========

2026-01-23 17:09:40,305 - INFO - 
🎉 所有账号处理完成!

## 执行结束... 2026-01-23 17:09:40  耗时 707 秒   

二、前置条件

青龙面板:我是直接用的 1panel 应用商店里的青龙面板
雨云账号密码:自个注册去

三、准备工作

青龙面板安装依赖:安装不上的,不会的请自行搜索教程或者请教 AI 了哦
—— NodeJs:chromium
—— Python3:selenium
—— Linux:chromium-driver
青龙面板配置环境变量:想设置多少自己设置就好了
—— RAINYUN_ACCOUNT
—— [[“账号 1”,“账号 1 密码”],[“账号 2”,“账号 2 密码”]]

四、创建文件

同目录下创建以下两个文件

stealth.min.js
stealth.min.js.txt

rainyun.py
rainyun.py.txt

删掉 [.txt] 后缀上传就好了,顺便贴出 rainyun.py 的代码如下

import json
import logging
import os
import random
import re
import sys
import time
from typing import Tuple, Optional, List

import cv2
import ddddocr
import requests
from selenium import webdriver
from selenium.common import TimeoutException, WebDriverException, NoSuchElementException
from selenium.webdriver import ActionChains
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait

# ===================== 青龙面板专属配置(常量不抽离) =====================
CONFIG = {
    "timeout": 20,  # 青龙面板网络可能不稳定,延长超时时间
    "max_delay": 5,  # 最大随机等待分钟数
    "max_captcha_retry": 10,  # 验证码最大重试次数(防止递归栈溢出)
    "similarity_threshold": 0.4,  # 降低阈值提升识别率
    "script_path": os.path.dirname(os.path.abspath(__file__)),  # 青龙脚本所在目录
    "temp_path": os.path.join(os.path.dirname(os.path.abspath(__file__)), "temp"),  # 临时文件路径
    "rainyun_login_url": "https://app.rainyun.com/auth/login",
    "rainyun_earn_url": "https://app.rainyun.com/account/reward/earn"
}

# 全局日志对象(仅日志全局化,核心变量均函数内初始化)
logger = logging.getLogger(__name__)

def init_logger():
    """初始化青龙面板日志格式(增强版)"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[logging.StreamHandler(sys.stdout)]
    )
    # 打印项目信息
    logger.info("-"*80)
    logger.info("雨云签到工具 by SerendipityR ~")
    logger.info("Github发布页: https://github.com/SerendipityR-2022/Rainyun-Qiandao")
    logger.info("-"*80)
    logger.info("雨云签到工具容器版 by fatekey ~")
    logger.info("Github发布页: https://github.com/fatekey/Rainyun-Qiandao")
    logger.info("-"*80)
    logger.info("                   项目为二次开发青龙脚本化运行")
    logger.info("                     本项目基于上述项目开发")
    logger.info("                本项目仅作为学习参考,请勿用于其他用途")
    logger.info("-"*80)

def init_selenium() -> WebDriver:
    """初始化青龙面板专用Selenium驱动(每次调用新建实例,避免缓存污染)"""
    ops = Options()
    # 容器环境必需配置
    ops.add_argument("--no-sandbox")
    ops.add_argument("--disable-dev-shm-usage")
    ops.add_argument("--headless=new")
    ops.add_argument("--disable-gpu")
    ops.add_argument("--window-size=1920,1080")
    ops.add_argument("--user-agent=Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
    # 反爬配置
    ops.add_experimental_option("excludeSwitches", ["enable-automation"])
    ops.add_experimental_option('useAutomationExtension', False)
    ops.add_argument("--disable-blink-features=AutomationControlled")
    
    # 青龙面板固定驱动路径校验
    driver_path = "/usr/bin/chromedriver"
    if not os.path.exists(driver_path) or not os.access(driver_path, os.X_OK):
        raise FileNotFoundError(
            f"青龙面板未安装chromium-driver!\n"
            f"请在青龙终端执行:apt update && apt install -y chromium-driver"
        )

    try:
        service = Service(executable_path=driver_path)
        driver = webdriver.Chrome(service=service, options=ops)
        # 清空缓存(双重保障)
        driver.delete_all_cookies()
        logger.info(f"✅ Selenium驱动初始化成功,路径:{driver_path}")
        return driver
    except WebDriverException as e:
        logger.error(f"❌ 驱动启动失败:{str(e)}")
        raise

def check_stealth_js() -> str:
    """检查青龙脚本目录下的stealth.min.js"""
    js_path = os.path.join(CONFIG["script_path"], "stealth.min.js")
    if not os.path.exists(js_path):
        logger.error(f"❌ 未找到stealth.min.js!请将文件上传到青龙脚本目录:{CONFIG['script_path']}")
        logger.info("📥 下载地址:https://raw.githubusercontent.com/berstend/puppeteer-extra/master/packages/puppeteer-extra-plugin-stealth/evasions/stealth.min.js")
        sys.exit(1)
    return js_path

def inject_stealth_js(driver: WebDriver):
    """注入反检测脚本(传入driver实例,解耦全局变量)"""
    js_path = check_stealth_js()
    with open(js_path, "r", encoding="utf-8") as f:
        js = f.read()
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js})
    logger.info("✅ 已注入stealth.min.js反检测脚本")

def download_image(url: str, filename: str, img_index: int) -> bool:
    """下载图片(带URL日志+青龙面板加请求头防拦截)"""
    os.makedirs(CONFIG["temp_path"], exist_ok=True)
    try:
        logger.info(f"开始下载验证码图片({img_index}):{url}")
        headers = {
            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Referer": "https://app.rainyun.com/"
        }
        response = requests.get(url, headers=headers, timeout=10)
        response.raise_for_status()
        path = os.path.join(CONFIG["temp_path"], filename)
        with open(path, "wb") as f:
            f.write(response.content)
        return True
    except Exception as e:
        logger.error(f"❌ 下载图片失败 {url}:{str(e)}")
        return False

# ========== 工具函数(精简+健壮性优化) ==========
def get_url_from_style(style: str) -> Optional[str]:
    """从style属性提取URL"""
    try:
        match = re.search(r'url\(["\']?(.*?)["\']?\)', style)
        return match.group(1) if match else None
    except Exception:
        return None

def get_width_from_style(style: str) -> str:
    """从style属性提取宽度"""
    match = re.search(r'width:\s*([\d.]+)px', style)
    return match.group(1) if match else "300"

def get_height_from_style(style: str) -> str:
    """从style属性提取高度"""
    match = re.search(r'height:\s*([\d.]+)px', style)
    return match.group(1) if match else "150"

def compute_similarity(img1_path: str, img2_path: str) -> Tuple[float, int]:
    """青龙面板适配:SIFT不可用时用ORB(增加异常兜底)"""
    try:
        img1 = cv2.imread(img1_path, cv2.IMREAD_GRAYSCALE)
        img2 = cv2.imread(img2_path, cv2.IMREAD_GRAYSCALE)
        if img1 is None or img2 is None:
            logger.warning(f"❌ 图片读取失败:{img1_path} 或 {img2_path}")
            return 0.0, 0

        # 优先SIFT,降级ORB
        try:
            sift = cv2.SIFT_create()
            norm = cv2.NORM_L2
        except AttributeError:
            sift = cv2.ORB_create()
            norm = cv2.NORM_HAMMING
            logger.warning("⚠️ SIFT不可用,使用ORB匹配")

        kp1, des1 = sift.detectAndCompute(img1, None)
        kp2, des2 = sift.detectAndCompute(img2, None)
        if des1 is None or des2 is None:
            return 0.0, 0

        bf = cv2.BFMatcher(norm, crossCheck=False)
        matches = bf.knnMatch(des1, des2, k=2)
        good = [m for m, n in matches if m.distance < 0.8 * n.distance]
        similarity = len(good) / len(matches) if matches else 0.0
        return similarity, len(good)
    except Exception as e:
        logger.error(f"❌ 相似度计算失败:{str(e)}")
        return 0.0, 0

def download_captcha_img(driver: WebDriver, wait: WebDriverWait) -> bool:
    """下载并分割验证码图片(解耦全局变量)"""
    try:
        # 清空旧临时文件
        if os.path.exists(CONFIG["temp_path"]):
            for f in os.listdir(CONFIG["temp_path"]):
                os.remove(os.path.join(CONFIG["temp_path"], f))

        # 定位验证码背景图
        slideBg = wait.until(EC.visibility_of_element_located((By.XPATH, '//*[@id="slideBg"]')))
        img1_url = get_url_from_style(slideBg.get_attribute("style"))
        if not img1_url or not download_image(img1_url, "captcha.jpg", 1):
            return False

        # 定位验证码碎片图
        sprite = wait.until(EC.visibility_of_element_located((By.XPATH, '//*[@id="instruction"]/div/img')))
        if not download_image(sprite.get_attribute("src"), "sprite.jpg", 2):
            return False

        # 分割碎片图
        raw = cv2.imread(os.path.join(CONFIG["temp_path"], "sprite.jpg"))
        if raw is None:
            logger.error("❌ 验证码碎片图读取失败")
            return False
        w = raw.shape[1]
        for i in range(3):
            cv2.imwrite(
                os.path.join(CONFIG["temp_path"], f"sprite_{i+1}.jpg"),
                raw[:, w//3*i : w//3*(i+1)]
            )
        return True
    except TimeoutException:
        logger.error("❌ 验证码图片加载超时")
        return False
    except Exception as e:
        logger.error(f"❌ 验证码图片处理失败:{str(e)}")
        return False

def check_answer(result: dict) -> bool:
    """检查验证码答案有效性(带识别率日志)"""
    valid = True
    for i in range(3):
        sim = float(result.get(f"sprite_{i+1}.similarity", 0))
        if sim < CONFIG["similarity_threshold"]:
            logger.error(f"⚠️ 图案{i+1}识别率{sim:.4f}低于阈值{CONFIG['similarity_threshold']}")
            valid = False
            break
    # 检查坐标唯一性
    positions = [result.get(f"sprite_{i+1}.position") for i in range(3)]
    if len(set(positions)) != 3:
        logger.error("❌ 验证码坐标重复,答案无效")
        valid = False
    return valid

def process_captcha(driver: WebDriver, wait: WebDriverWait) -> bool:
    """处理验证码(改递归为循环,提升健壮性,解耦全局变量)"""
    captcha_retry_count = 0
    ocr = ddddocr.DdddOcr(ocr=True, show_ad=False)
    det = ddddocr.DdddOcr(det=True, show_ad=False)

    while captcha_retry_count < CONFIG["max_captcha_retry"]:
        captcha_retry_count += 1
        logger.info(f"🔄 验证码处理第{captcha_retry_count}次尝试(最大{CONFIG['max_captcha_retry']}次)")
        try:
            # 下载验证码图片
            if not download_captcha_img(driver, wait):
                raise Exception("验证码图片下载失败")

            # 校验验证码有效性
            valid = True
            for i in range(3):
                sprite_path = os.path.join(CONFIG["temp_path"], f"sprite_{i+1}.jpg")
                with open(sprite_path, "rb") as f:
                    if ocr.classification(f.read()) in ["0", "1"]:
                        valid = False
                        break
            if not valid:
                raise Exception("验证码碎片无效")

            # 识别验证码
            captcha = cv2.imread(os.path.join(CONFIG["temp_path"], "captcha.jpg"))
            if captcha is None:
                raise Exception("验证码背景图读取失败")
            with open(os.path.join(CONFIG["temp_path"], "captcha.jpg"), "rb") as f:
                bboxes = det.detection(f.read())
            if not bboxes:
                raise Exception("未检测到验证码图案")

            # 匹配碎片与背景图
            result = {}
            for i, (x1, y1, x2, y2) in enumerate(bboxes):
                # 裁剪背景图中的图案
                cv2.imwrite(os.path.join(CONFIG["temp_path"], f"spec_{i+1}.jpg"), captcha[y1:y2, x1:x2])
                # 计算与每个碎片的相似度
                for j in range(3):
                    sim, _ = compute_similarity(
                        os.path.join(CONFIG["temp_path"], f"sprite_{j+1}.jpg"),
                        os.path.join(CONFIG["temp_path"], f"spec_{i+1}.jpg")
                    )
                    key_sim = f"sprite_{j+1}.similarity"
                    key_pos = f"sprite_{j+1}.position"
                    if sim > float(result.get(key_sim, 0)):
                        result[key_sim] = sim
                        result[key_pos] = f"{int((x1+x2)/2)},{int((y1+y2)/2)}"

            # 校验答案
            if not check_answer(result):
                raise Exception("验证码答案无效")

            # 打印匹配结果
            for i in range(3):
                pos = result[f"sprite_{i+1}.position"]
                sim = result[f"sprite_{i+1}.similarity"]
                x, y = pos.split(",")
                logger.info(f"🎯 图案 {i+1} 坐标({x},{y}),匹配率:{sim:.4f}")

            # 点击验证码图案
            slideBg = wait.until(EC.visibility_of_element_located((By.XPATH, '//*[@id="slideBg"]')))
            style = slideBg.get_attribute("style")
            width, height = float(get_width_from_style(style)), float(get_height_from_style(style))
            width_raw, height_raw = captcha.shape[1], captcha.shape[0]

            for i in range(3):
                pos = result[f"sprite_{i+1}.position"]
                x, y = map(int, pos.split(","))
                # 计算实际点击坐标(适配页面缩放)
                final_x = int(-width/2 + x/width_raw * width) + random.randint(-1, 1)
                final_y = int(-height/2 + y/height_raw * height) + random.randint(-1, 1)
                ActionChains(driver).move_to_element_with_offset(slideBg, final_x, final_y).click().perform()
                time.sleep(random.uniform(0.5, 1))

            # 提交验证码
            logger.info("📤 提交验证码")
            confirm = wait.until(EC.element_to_be_clickable((By.XPATH, '//*[@id="tcStatus"]/div[2]/div[2]/div/div')))
            confirm.click()
            time.sleep(5)

            # 校验验证码结果
            tc_operation = wait.until(EC.visibility_of_element_located((By.XPATH, '//*[@id="tcOperation"]')))
            if tc_operation.get_attribute("class") == "tc-opera pointer show-success":
                logger.info("✅ 验证码验证通过")
                return True
            else:
                raise Exception("验证码验证失败")

        except Exception as e:
            logger.error(f"❌ 验证码处理失败:{str(e)}")
            # 刷新验证码重试
            try:
                logger.error("⏳ 刷新验证码中,稍后重试……")
                reload = driver.find_element(By.XPATH, '//*[@id="reload"]')
                time.sleep(3)
                reload.click()
                time.sleep(min(3 * (2 ** (captcha_retry_count - 1)), 30))  # 指数退避重试间隔,上限30秒
            except NoSuchElementException:
                logger.error("❌ 验证码刷新按钮未找到,重试失败")
                return False

    logger.error(f"❌ 验证码重试{CONFIG['max_captcha_retry']}次仍失败,放弃")
    return False

def clean_temp():
    """清理青龙面板临时文件(增加容错)"""
    try:
        if os.path.exists(CONFIG["temp_path"]):
            for f in os.listdir(CONFIG["temp_path"]):
                file_path = os.path.join(CONFIG["temp_path"], f)
                try:
                    os.remove(file_path)
                except Exception as e:
                    logger.warning(f"⚠️ 删除临时文件{file_path}失败:{str(e)}")
            os.rmdir(CONFIG["temp_path"])
        logger.info("✅ 临时文件清理完成")
    except Exception as e:
        logger.warning(f"⚠️ 清理临时文件失败:{str(e)}")

def parse_accounts() -> List[List[str]]:
    """解析青龙面板RAINYUN_ACCOUNT环境变量"""
    account_str = os.getenv("RAINYUN_ACCOUNT")
    if not account_str:
        logger.error("❌ 未配置RAINYUN_ACCOUNT环境变量!格式应为[[账号1,密码1],[账号2,密码2]]")
        sys.exit(1)
    
    try:
        # 解析JSON格式的账号列表(兼容单引号/双引号)
        account_str = account_str.replace("'", "\"")  # 统一为双引号
        accounts = json.loads(account_str)
        # 校验格式
        if not isinstance(accounts, list):
            raise ValueError("环境变量值不是列表类型")
        for idx, account in enumerate(accounts):
            if not isinstance(account, list) or len(account) != 2:
                raise ValueError(f"第{idx+1}个账号格式错误,应为[账号,密码]")
            if not account[0] or not account[1]:
                raise ValueError(f"第{idx+1}个账号/密码为空")
        logger.info(f"✅ 成功解析{len(accounts)}个账号")
        return accounts
    except json.JSONDecodeError as e:
        logger.error(f"❌ RAINYUN_ACCOUNT格式解析失败:{str(e)},请检查格式是否为合法JSON")
        sys.exit(1)
    except ValueError as e:
        logger.error(f"❌ RAINYUN_ACCOUNT格式错误:{str(e)}")
        sys.exit(1)

def sign_in_rainyun(username: str, password: str):
    """单账号签到核心逻辑(独立封装,支持多账号循环调用)"""
    driver = None
    try:
        logger.info(f"\n========== 开始处理账号:{username} ==========")
        # 随机延时(可选)
        delay = random.randint(0, CONFIG["max_delay"])
        delay_sec = random.randint(0, 60)
        logger.info(f"⏳ 随机延时 {delay} 分钟 {delay_sec} 秒")
        time.sleep(delay * 60 + delay_sec)

        # 初始化Selenium(每次新建实例,清空缓存)
        driver = init_selenium()
        inject_stealth_js(driver)
        wait = WebDriverWait(driver, CONFIG["timeout"])

        # 访问登录页
        logger.info("⏳ 发起登录请求")
        logger.info("🌐 访问雨云登录页")
        driver.get(CONFIG["rainyun_login_url"])
        logger.info(f"页面标题:{driver.title}")

        # 输入账号密码
        logger.info("⏳ 等待登录表单元素加载...")
        username_elem = wait.until(EC.visibility_of_element_located((By.NAME, "login-field")))
        password_elem = wait.until(EC.visibility_of_element_located((By.NAME, "login-password")))
        login_btn = wait.until(EC.element_to_be_clickable((By.XPATH, '//*[@id="app"]/div[1]/div[1]/div/div[2]/fade/div/div/span/form/button')))
        logger.info("📝 输入账号密码")
        username_elem.send_keys(username)
        password_elem.send_keys(password)
        login_btn.click()
        logger.info("⏳ 正在登录中,耗时较长请稍等……")
        time.sleep(3)

        # 处理登录验证码
        try:
            wait.until(EC.visibility_of_element_located((By.ID, "tcaptcha_iframe_dy")))
            logger.warning("⚠️ 触发登录验证码")
            driver.switch_to.frame("tcaptcha_iframe_dy")
            if not process_captcha(driver, wait):
                raise Exception("登录验证码验证失败")
        except TimeoutException:
            logger.info("✅ 未触发登录验证码")

        # 校验登录状态
        time.sleep(5)
        driver.switch_to.default_content()
        logger.info(f"当前页面: {driver.current_url}")
        logger.info(f"页面标题: {driver.title}")
        if driver.current_url != "https://app.rainyun.com/dashboard":
            raise Exception("登录失败!请检查账号密码或网络")
        user_name = driver.find_element(By.XPATH, '//*[@id="app"]/div[1]/nav/div[1]/ul/div[6]/li/a/div/div/p').text.strip()
        logger.info(f"✅ 账号登录成功:{user_name}")
        
        # 访问签到页
        logger.info("🌐 访问赚取积分页")
        driver.get(CONFIG["rainyun_earn_url"])
        driver.implicitly_wait(5)
        logger.info(f"当前页面: {driver.current_url}")
        logger.info(f"页面标题: {driver.title}")

        # 查找并点击签到按钮
        logger.info("🔍 查找每日签到按钮")
        earn_btn_qddiv = driver.find_element(By.XPATH, '//*[@id="app"]/div[1]/div[3]/div[2]/div/div/div[2]/div[2]/div/div/div/div[1]/div')
        earn_btn_qd = earn_btn_qddiv.find_element(By.XPATH, './/span[contains(text(),"每日签到")]')
        status_elem = earn_btn_qd.find_element(By.XPATH, './following-sibling::span[1]')
        status_text = status_elem.text.strip()

        if status_text == "领取奖励":
            earn_btn = status_elem.find_element(By.XPATH, './a')
            logger.info(f"📌 签到状态:{status_text},开始领取")
            earn_btn.click()

            # 处理签到验证码
            logger.info("⚠️ 触发签到验证码")
            driver.switch_to.frame("tcaptcha_iframe_dy")
            if not process_captcha(driver, wait):
                raise Exception("签到验证码验证失败")
            driver.switch_to.default_content()

            # 校验签到结果
            time.sleep(5)
            logger.info("✅ 签到奖励领取成功")
        else:
            logger.info(f"📌 签到状态:{status_text},无需重复签到")

        # 获取当前积分
        try:
            points_elem = driver.find_element(By.XPATH, '//*[@id="app"]/div[1]/div[3]/div[2]/div/div/div[2]/div[1]/div[1]/div/p/div/h3')
            current_points = int(''.join(re.findall(r'\d+', points_elem.text)))
            logger.info(f"💰 当前积分:{current_points}(约{current_points/2000:.2f}元)")
        except Exception as e:
            logger.warning(f"⚠️ 积分获取失败:{str(e)}")

        

    except Exception as e:
        logger.error(f"❌ 账号{username}处理失败:{str(e)}", exc_info=True)
    finally:
        # 关闭浏览器,彻底清空缓存
        if driver:
            try:
                driver.quit()
                logger.info(f"✅ 账号{username}浏览器已关闭")
            except Exception as e:
                logger.warning(f"⚠️ 关闭浏览器失败:{str(e)}")
        # 清理临时文件
        clean_temp()
    logger.info(f"\n========== 账号{username}处理完成 ==========\n")

def main():
    """主函数:解析多账号,依次执行签到"""
    init_logger()
    # 解析账号列表
    accounts = parse_accounts()
    # 依次处理每个账号
    for idx, (username, password) in enumerate(accounts, 1):
        logger.info(f"\n================= 处理第{idx}个账号 ==================")
        sign_in_rainyun(username, password)
        # 账号间间隔(可选)
        time.sleep(random.uniform(2, 5))
    
    logger.info("\n🎉 所有账号处理完成!")

if __name__ == "__main__":
    main()

五、最后

后面的设置定时任务之类的就懒得写啦,顺便我也懒得写通知之类的了,感谢前人的开发让我站于巨人肩膀上,要是有后来者优化了就更好了

哦,对了,懒得上传 github 了,就丢 linux.do 里好了


📌 转载信息
原作者:
T_ACGN
转载时间:
2026/1/24 07:01:47

背景

一开始是通过Api获取数据,但是最近他们增加X-Gnarly参数,而且在github上没有找有效的方案后,放弃api请求,改用页面爬取的方式。彻底避免参数加密校验。

我的环境

    python 3.11 
    selenium 4.39.0
    playwright 1.57.0

评论页面

实现啦抓取第一页和第二页的评论,你们要是抓更多页可以吧第二页改成循环。
执行脚本后会在当前目录生成一份json文件,里面是/api/comment/list/接口返回的数据。

 python3.11 comment_scraper.py "@mahi.islam.oliva/video/7565942090039954706"

image.png

代码如下:

import json
import time
import sys
import base64
import re,os
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import argparse



def merge_comments(first_page, second_page):
    """合并两页的评论数据"""
    merged_data = first_page.copy()
    if 'comments' in second_page:
        if 'comments' not in merged_data:
            merged_data['comments'] = []
        merged_data['comments'].extend(second_page['comments'])
    return merged_data

def extract_tiktok_filename(path: str) -> str:
    """
    从 TikTok 路径(如 '@username/video/123456')中提取 'username_123456'
    支持带或不带 @、带 URL 等情况
    """
    # 匹配模式:可选的 @ + 用户名(字母数字下划线.)+ /video/ + 数字ID
    match = re.search(r'@?([\w.]+)/video/(\d{16,})', path)
    if match:
        username = match.group(1)
        video_id = match.group(2)
        return f"{username}_{video_id}"
    else:
        # 如果格式不符,回退到清理后的通用方式
        safe = re.sub(r'[\\/:*?"<>|\s]+', '_', path.strip('@/'))
        return safe[:100]


class TiktokScraper:
    def __init__(self):
        self.comments_data = []
        self.setup_driver()


    def setup_driver(self):

        chrome_options = Options()
        chrome_options.set_capability("goog:loggingPrefs", {"performance": "ALL"})

        chrome_options.add_argument("--start-maximized")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--headless=new")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-blink-features=AutomationControlled")
        chrome_options.add_argument("--disable-infobars")
        chrome_options.add_argument("--disable-extensions")
        chrome_options.add_argument("--disable-gpu")  # 减少 WebGL 差异(可选)
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)

        user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
        chrome_options.add_argument('user-agent={0}'.format(user_agent))

        self.driver = webdriver.Chrome(options=chrome_options)

        self.driver.execute_cdp_cmd("Emulation.setDeviceMetricsOverride", {
            "width": 1440,
            "height": 900,
            "deviceScaleFactor": 2,  # macOS Retina
            "mobile": False
        })

        # 覆盖 WebGL 参数(关键!)
        self.driver.execute_cdp_cmd("Emulation.setHardwareConcurrencyOverride", {"hardwareConcurrency": 8})
        # 1. 设置基础 UA(CDP 安全方式)
        self.driver.execute_cdp_cmd("Emulation.setUserAgentOverride", {
            "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "platform": "MacIntel"
        })

        # 2. 用 JS 覆盖高级指纹(包括 userAgentData)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": """
#             delete navigator.__proto__.webdriver;

            Object.defineProperty(navigator, 'platform', { get: () => 'MacIntel' });
            Object.defineProperty(navigator, 'languages', { get: () => ['en-US', 'en'] });

            // 伪造 userAgentData
            if (!navigator.userAgentData) {
                Object.defineProperty(navigator, 'userAgentData', {
                    value: {
                        brands: [
                            { brand: "Chromium", version: "120" },
                            { brand: "Google Chrome", version: "120" },
                            { brand: "Not:A-Brand", version: "99" }
                        ],
                        mobile: false,
                        platform: "macOS",
                        getHighEntropyValues: async (hints) => ({
                            architecture: "x86_64",
                            model: "",
                            platform: "macOS",
                            platformVersion: "13.5",
                            uaFullVersion: "120.0.6099.0"
                        })
                    },
                    writable: false,
                    configurable: false
                });
            }
            """
        })

        # 覆盖 WebGL 渲染器(防指纹关键)
        self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
            "source": """
            const getParameter = WebGLRenderingContext.prototype.getParameter;
            WebGLRenderingContext.prototype.getParameter = function(param) {
                if (param === 37445) return 'Apple Inc.'; // UNMASKED_VENDOR_WEBGL
                if (param === 37446) return 'Apple GPU';   // UNMASKED_RENDERER_WEBGL
                return getParameter.call(this, param);
            };
            """
        })
        self.driver.execute_cdp_cmd("Emulation.setTimezoneOverride", {"timezoneId": "America/New_York"})
        self.driver.execute_cdp_cmd("Emulation.setLocaleOverride", {"locale": "en-US"})

    def extract_comment_response_from_logs(self):
        """从 performance 日志中提取评论 API 的完整响应"""
        try:
            logs = self.driver.get_log("performance")
        except Exception as e:
            print(f"获取日志失败: {e}")
            return None

        request_id_to_url = {}
        finished_request_ids = set()

        for entry in logs:
            try:
                message = json.loads(entry["message"])
                method = message.get("message", {}).get("method")
                params = message.get("message", {}).get("params", {})

                if method == "Network.responseReceived":
                    url = params.get("response", {}).get("url", "")
                    request_id = params.get("requestId")
                    if request_id and re.search(r'comment.*list|comments.*aweme', url, re.I):
                        request_id_to_url[request_id] = url

                elif method == "Network.loadingFinished":
                    request_id = params.get("requestId")
                    if request_id:
                        finished_request_ids.add(request_id)
            except Exception:
                continue

        for req_id, url in request_id_to_url.items():
            if req_id in finished_request_ids:
                try:
                    body = self.driver.execute_cdp_cmd(
                        "Network.getResponseBody",
                        {"requestId": req_id}
                    )
                    raw = body.get("body", "{}")
                    if body.get("base64Encoded"):
                        raw = base64.b64decode(raw).decode("utf-8")
                    data = json.loads(raw)
                    if isinstance(data, dict) and ("comments" in data or "item_comments" in data):
                        print(f"✅ 捕获评论接口: {url}")
                        return data
                except Exception as e:
                    print(f"获取响应体失败 (req_id={req_id}): {e}")

        return None

    def scroll_comment_section(self):
        """在 .TUXTabBar-content 内部查找并滚动真正的评论列表容器"""
        script = """
            const tabContent = document.querySelector('.TUXTabBar-content');
            if (!tabContent) {
                console.log('❌ .TUXTabBar-content not found');
                return false;
            }

            // 获取所有子 div
            const candidates = Array.from(tabContent.querySelectorAll('div'));

            // 按 DOM 层级深度排序(优先选深层级的,通常是列表)
            candidates.sort((a, b) => {
                let depthA = 0, depthB = 0;
                let p = a; while (p && p !== tabContent) { depthA++; p = p.parentElement; }
                p = b; while (p && p !== tabContent) { depthB++; p = p.parentElement; }
                return depthB - depthA; // 深的优先
            });

            for (const el of candidates) {
                const style = window.getComputedStyle(el);
                const overflowY = style.overflowY;
                // 必须满足:可滚动 + 有溢出内容
                if ((overflowY === 'auto' || overflowY === 'scroll') &&
                    el.scrollHeight > el.clientHeight) {
                    el.scrollTop = el.scrollHeight+100;
                    console.log('✅ Scrolled real comment container');
                    return true;
                }
            }

            console.log('⚠️ No scrollable child found in .TUXTabBar-content');
            return false;
        """
        try:
            result = self.driver.execute_script(script)
            return result is True
        except Exception as e:
            print(f"滚动执行异常: {e}")
            return False

    def auto_play_and_load_more_comments(self, user_input):

        url = 'https://www.tiktok.com/' + user_input
        print(f"打开视频页面: {url}")
        self.driver.get(url)
        wait = WebDriverWait(self.driver, 20)
        # wait.until(EC.presence_of_element_located((By.TAG_NAME, "video")))
        # 等待评论tab加载完毕
        # wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.TUXTabBar-list")))
        wait.until(EC.presence_of_element_located((By.XPATH, "//span[@data-e2e='comment-icon']")))

        print("视频评论已加载")

        # 点击评论按钮
        try:
            comment_span = wait.until(
                EC.element_to_be_clickable((By.XPATH, '//span[@data-e2e="comment-icon"]'))
            )
            print("正在点击评论图标 (span[@data-e2e='comment-icon'])...")
            self.driver.execute_script("arguments[0].click();", comment_span)
        except Exception as e:
            print(f"无法点击评论按钮: {e}")
            return

#         time.sleep(2)
#         debug_prefix = extract_tiktok_filename(user_input)
#         try:
#             # 保存 HTML
#             with open(f"{debug_prefix}_after_click.html", "w", encoding="utf-8") as f:
#                 f.write(self.driver.page_source)
#             print(f"页面 HTML 已保存: {debug_prefix}_after_click.html")
#
#             # 保存截图
#             self.driver.save_screenshot(f"{debug_prefix}_after_click.png")
#             print(f"页面截图已保存: {debug_prefix}_after_click.png")
#         except Exception as e:
#             print(f"保存调试文件失败: {e}")

        # 加载第一页评论
        first_page_data = self.wait_for_comments(10)
        if not first_page_data:
            print("未捕获到第一页评论")
            return
        self.comments_data.append(first_page_data)

        # 模拟滚动加载更多评论
        # self.driver.execute_script("document.querySelector('.TUXTabBar-content').scrollTo(0, document.querySelector('.TUXTabBar-content').scrollHeight);")
        # 改为调用新方法
        time.sleep(1)
        if self.scroll_comment_section():
            print("已滚动加载更多评论...")
            time.sleep(1)  # 等待新评论加载
        else:
            print("无法滚动评论区,可能结构变化")

        # 加载第二页评论
        second_page_data = self.wait_for_comments(10)
        if second_page_data:
            # 假设每页返回的数据结构相似,合并 comments 字段
            merged_comments = merge_comments(first_page_data, second_page_data)
        else:
            merged_comments = first_page_data
            print("未捕获到第二页评论")


        filename = f"{extract_tiktok_filename(user_input)}.json"
        print(filename)
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(merged_comments, f, ensure_ascii=False, indent=2)
        print(f"评论数据已保存到: {filename}")
        print(f"   共 {len(merged_comments.get('comments', []))} 条评论")

    def wait_for_comments(self, timeout_seconds=10):
        """等待并捕获评论API响应"""
        start_time = time.time()
        while time.time() - start_time < timeout_seconds:
            comment_data = self.extract_comment_response_from_logs()
            if comment_data:
                return comment_data
            time.sleep(0.5)
        return None

    def close(self):
        if hasattr(self, "driver"):
            self.driver.quit()


def main():
    parser = argparse.ArgumentParser(
        description="Scrape TikTok comments via /api/comment/list/ ")
    parser.add_argument(
        "video_input",
        help="TikTok video URL or video_id, e.g., '/@user/video/7318855966163275054' "
    )
    args = parser.parse_args()

    video_input = args.video_input.strip()
    print(video_input)

    if not video_input:
        print("Error: Video input cannot be empty")
        sys.exit(1)


    scraper = TiktokScraper()
    try:
        scraper.auto_play_and_load_more_comments(video_input)
        time.sleep(1)  # 保持窗口打开以便观察
    finally:
        scraper.close()

    sys.exit(0)

if __name__ == "__main__":
    main()

用户页面发布的视频

这里只实现啦只第一页接口的数据, /api/post/item_list/把这个接口的数据放到啦一个json文件中。
这个页面我做了根据cookie的登陆,其实不登陆应该也可以。cookie 文件是通过chrome扩展 Cookies.txt 生成。登陆TikTok后点击这个扩展下载文件下来就行。
image.png

python3.11 post_item_list.py @dlw2026
image.png

post_item_list.py 代码如下:

# scraper.py
import asyncio
import json
import sys
import argparse
from playwright.async_api import async_playwright
from cookies import load_cookies_safely


# 这是用来抓取用户主页的 /api/post/item_list/


async def scrape_tiktok_user(username):
    target_responses = []
    clean_username = username.lstrip("@")
    output_json = clean_username + "_posts.json"
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(
            viewport={"width": 1920, "height": 1080},
            user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
        )

        cookies = load_cookies_safely()
        await context.add_cookies(cookies)
        page = await context.new_page()

         # 隐藏自动化特征
        await page.add_init_script("""
            // 隐藏 webdriver 标志
            delete navigator.__proto__.webdriver;
            window.chrome = { runtime: {} };
            // 伪造 platform 为 Mac
            Object.defineProperty(navigator, 'platform', {
                get: () => 'MacIntel'
            });
            // 伪造 userAgentData(高熵指纹)
            if (!navigator.userAgentData) {
                Object.defineProperty(navigator, 'userAgentData', {
                    value: {
                        brands: [
                            { brand: "Chromium", version: "120" },
                            { brand: "Google Chrome", version: "120" },
                            { brand: "Not:A-Brand", version: "99" }
                        ],
                        mobile: false,
                        platform: "macOS",
                        getHighEntropyValues: async (hints) => ({
                            architecture: "x86_64",
                            model: "",
                            platform: "macOS",
                            platformVersion: "13.5",
                            uaFullVersion: "120.0.6099.0"
                        })
                    },
                    writable: false,
                    configurable: false
                });
            }
        """)

        # ✅ 关键:宽松匹配 API(不再检查 content-type)
        def handle_response(response):
            url = response.url
            if (
                    "/api/post/item_list/" in url
                    and response.status == 200
                    and "tiktok.com" in url
            ):
                if not target_responses:
                    target_responses.append(response)
                    print(f"捕获 API: {url.split('?')[0]}")

        page.on("response", handle_response)

        url = f"https://www.tiktok.com/{username}"
        print(f"打开页面: {url}")
        await page.goto(url, wait_until="domcontentloaded", timeout=50000)

        # 等待用户信息出现
        try:
            await page.wait_for_selector('h1[data-e2e="user-title"]', timeout=15000)
            print("用户主页加载成功")
        except:
            print("用户信息未加载,继续尝试...")

        # 滚动一下,触发懒加载(重要!)
        await page.evaluate("window.scrollTo(0, document.body.scrollHeight / 3)")

        # 等待 API(最多 20 秒)
        for i in range(40):
            if target_responses:
                break
            if i % 10 == 0:
                print(f"⏳ 等待 API 中... ({i * 0.5}s)")
            await asyncio.sleep(0.5)

        api_data = None
        if target_responses:
            try:
                api_data = await target_responses[0].json()
                print("✅ 成功解析 JSON 数据")
            except Exception as e:
                # 如果 .json() 失败,可能是 text/plain,手动解析
                try:
                    text = await target_responses[0].text()
                    api_data = json.loads(text)
                    print("✅ 通过 .text() 成功解析 JSON")
                except:
                    print(f"❌ 完全无法解析响应: {e}")

        if api_data:
            with open(output_json, "w", encoding="utf-8") as f:
                json.dump(api_data, f, ensure_ascii=False, indent=2)
            items = api_data.get("itemList", [])
            print(f"抓取到 {len(items)} 个视频,已保存至 {output_json}")
        else:
            print("未捕获到任何 API 数据")
            # 调试:打印所有请求(可选)
#             await page.route("**/*", lambda route: print("REQ:", route.request.url) or route.continue_())
#         screenshot_path = f"{clean_username}_homepage.png"
#         await page.screenshot(path=screenshot_path, full_page=True)
#         print(f"已保存页面截图: {screenshot_path}")

        await page.wait_for_timeout(5000)
        await browser.close()
        if api_data:
            return True
        else:
            return False


def main():
    parser = argparse.ArgumentParser(description="Scrape TikTok user profile")
    parser.add_argument("username", help="TikTok username (with or without @), e.g., @dishilife or dishilife")
    args = parser.parse_args()

    username = args.username.strip()
    if not username:
        print("Error: Username cannot be empty")
        sys.exit(1)
    if not username.startswith('@'):
        username = '@' + username
    success = asyncio.run(scrape_tiktok_user(username))
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()

cookies.py脚本:

import os
from datetime import datetime



COOKIES_FILE = "cookies.txt"

def load_cookies_safely():
    filepath = COOKIES_FILE
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"❌ Cookie 文件不存在: {os.path.abspath(filepath)}")

    cookies = []
    current_ts = int(datetime.now().timestamp())
    tiktok_domains = {".tiktok.com", "www.tiktok.com"}

    with open(filepath, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            parts = line.split("\t")
            if len(parts) < 7:
                continue

            domain = parts[0]
            if domain.startswith("#HttpOnly_"):
                domain = domain[len("#HttpOnly_"):]
            if not domain.startswith("."):
                domain = "." + domain.lstrip(".")

            if not any(t in domain for t in tiktok_domains):
                continue

            cookie = {
                "name": parts[5],
                "value": parts[6],
                "domain": domain,
                "path": parts[2],
                "secure": parts[3].upper() == "TRUE",
            }

            expires_str = parts[4]
            if expires_str.isdigit():
                expires = int(expires_str)
                if expires > current_ts:
                    cookie["expires"] = expires

            cookies.append(cookie)

    if not cookies:
        raise ValueError("❌ 未加载有效 Cookie!请确认包含 sessionid。")
    return cookies

if __name__ == "__main__":
    print('不可以直接执行')

Gemini Business 自动注册 & 2API 上传工具

基于大佬们的开源成果,整合了定时注册自动上传过期剔除等功能。实现全自动化的账号池维护。

核心逻辑优化

  • 自动维护:定时注册并直接传到 2API,自动剔除已过期账号,保留可用账号。
  • 失败重试:修改了注册机逻辑,设定申请 N 个,即使中间失败,也会一直重试直到成功申请到 N 个为止。
  • 丰俭由人:建议每 11 小时注册 2-3 个即可满足个人使用。避免对随机邮箱大佬提供不必要的压力。


感谢各位大佬的无私奉献:


环境准备

请确保已安装 Python 环境,并安装以下依赖库:

pip install undetected-chromedriver selenium beautifulsoup4 requests pystray pillow

配置说明

1. 先去 hf 部署一个 2api, 记住 API 地址admin_key
2. 新建并复制代码生成 py 文件,右键编辑,在顶部的配置区域填入你的 2API 信息:

 # 服务器 API 配置 API_HOST = "请输入你的服务器API地址" ADMIN_KEY = "请输入你的管理员密钥" # 无头模式开关 (True=后台运行无窗口, False=显示浏览器窗口) HEADLESS_MODE = True ##无头注册率会低点,但胜在静默,结合重试其实体验更好。 

运行脚本

在命令行中执行:

py gemini_auto.py

代码如下:

gemini_auto.py
"""
Gemini Business 自动注册上传工具
"""

# 标准库
import sys
import json
import time
import random
from pathlib import Path
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse, parse_qs
from concurrent.futures import ThreadPoolExecutor

# 第三方库
import requests
from bs4 import BeautifulSoup
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC


# ==================== 配置区域 ====================
# 服务器 API 配置
API_HOST = "请输入你的服务器API地址"
ADMIN_KEY = "请输入你的管理员密钥"

# 临时邮箱 API 配置
MAIL_API = "https://mail.chatgpt.org.uk"
MAIL_KEY = "gpt-test"

# Gemini 登录页面
LOGIN_URL = "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZS1hY2JkLThmOGY2ZDE0ODM1Mg"

# 本地账号文件
ACCOUNTS_FILE = "accounts.json"

# 页面元素定位
XPATH = {
    "email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input",
    "continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button",
    "verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button",
}

# 随机姓名池
NAMES = [
    "James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones",
    "David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez",
    "Elizabeth Taylor", "Richard Moore", "Susan Wilson", "Joseph Anderson", "Jessica Thomas",
    "Charles Jackson", "Sarah White", "Christopher Harris", "Karen Martin", "Daniel Thompson",
    "Thomas Garcia", "Nancy Martinez", "Matthew Robinson", "Lisa Clark", "Anthony Lewis",
    "Betty Walker", "Mark Young", "Margaret Allen", "Donald King", "Sandra Wright"
]

# 全局停止标志 (用于 GUI 停止任务)
STOP_FLAG = False

# 无头模式开关 (True=后台运行无窗口, False=显示浏览器窗口)
HEADLESS_MODE = True
# ==================================================


# ==================== 工具函数 ====================
def print_log(msg, level="INFO"):
    """统一日志输出格式"""
    icons = {"INFO": "→", "WARN": "⚠", "ERROR": "✗", "OK": "✓"}
    icon = icons.get(level, "•")
    print(f"{icon} {msg}")


def print_separator(char="=", length=80):
    """打印分隔线"""
    print(char * length)


def print_progress(current, total, success, fail, avg_time):
    """打印进度信息"""
    print(f"\n>>> 进度: {current}/{total} | 成功: {success} | 失败: {fail} | 平均耗时: {avg_time:.1f}s")


def log_error(email, error_msg):
    """记录错误到日志文件"""
    error_file = Path("errors.log")
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    log_entry = f"[{timestamp}] 邮箱: {email} | 错误: {error_msg}\n"
    
    try:
        with open(error_file, "a", encoding="utf-8") as f:
            f.write(log_entry)
        print_log(f"错误已记录到 errors.log", "INFO")
    except Exception as e:
        print_log(f"写入错误日志失败: {e}", "WARN")


# ==================== 邮箱管理 ====================
email_queue = []


def create_temp_email():
    """创建临时邮箱地址"""
    try:
        response = requests.get(
            f"{MAIL_API}/api/generate-email",
            headers={"X-API-Key": MAIL_KEY},
            timeout=30
        )
        if response.status_code == 200 and response.json().get('success'):
            email = response.json()['data']['email']
            return email
    except Exception as e:
        print_log(f"邮箱服务异常: {e}", "❌")
    return None


def prefetch_email():
    """预创建邮箱并加入队列"""
    email = create_temp_email()
    if email:
        email_queue.append(email)


def get_email():
    """获取邮箱地址(优先使用队列中的)"""
    if email_queue:
        email = email_queue.pop(0)
        print_log(f"邮箱就绪 → {email}")
        return email
    
    email = create_temp_email()
    if email:
        print_log(f"已生成 → {email}")
    return email


def fetch_verification_code(email, timeout=60):
    """获取邮箱验证码"""
    print_log("等待邮件验证码...")
    start_time = time.time()
    
    while time.time() - start_time < timeout:
        try:
            response = requests.get(
                f"{MAIL_API}/api/emails",
                params={"email": email},
                headers={"X-API-Key": MAIL_KEY},
                timeout=10
            )
            
            if response.status_code == 200:
                emails = response.json().get('data', {}).get('emails', [])
                if emails:
                    html_content = emails[0].get('html_content') or emails[0].get('content', '')
                    soup = BeautifulSoup(html_content, 'html.parser')
                    code_element = soup.find('span', class_='verification-code')
                    
                    if code_element:
                        code = code_element.get_text().strip()
                        if len(code) == 6:
                            print_log(f"验证码 → {code}", "OK")
                            return code
        except:
            pass
        
        elapsed = int(time.time() - start_time)
        print(f"  等待中... ({elapsed}s)", end='\r')
        time.sleep(2)
    
    print_log("验证码超时,请检查网络", "ERROR")
    return None


# ==================== 账号注册 ====================
def save_account_config(email, driver, timeout=10):
    """提取并保存账号配置信息"""
    print_log(f"提取账号配置中(最多 {timeout}s)...")
    start_time = time.time()
    account_data = None

    while time.time() - start_time < timeout:
        cookies = driver.get_cookies()
        current_url = driver.current_url
        parsed_url = urlparse(current_url)

        # 提取 config_id
        url_parts = current_url.split('/')
        config_id = None
        for i, part in enumerate(url_parts):
            if part == 'cid' and i + 1 < len(url_parts):
                config_id = url_parts[i + 1].split('?')[0]
                break

        # 提取关键 cookies
        cookie_map = {c['name']: c for c in cookies}
        session_cookie = cookie_map.get('__Secure-C_SES', {})
        host_cookie = cookie_map.get('__Host-C_OSES', {})

        # 提取 csesidx
        csesidx = parse_qs(parsed_url.query).get('csesidx', [None])[0]

        # 验证所有必需字段
        if all([
            session_cookie.get('value'),
            host_cookie.get('value'),
            csesidx,
            config_id
        ]):
            expiry_timestamp = session_cookie.get('expiry', 0) - 43200
            expires_at = datetime.fromtimestamp(expiry_timestamp).strftime('%Y-%m-%d %H:%M:%S') if expiry_timestamp > 0 else None
            
            account_data = {
                "id": email,
                "csesidx": csesidx,
                "config_id": config_id,
                "secure_c_ses": session_cookie.get('value'),
                "host_c_oses": host_cookie.get('value'),
                "expires_at": expires_at
            }
            
            elapsed = time.time() - start_time
            print_log(f"配置提取完成 ({elapsed:.1f}s)", "OK")
            break

        time.sleep(1)

    if not account_data:
        print_log(f"配置不完整,已跳过 → {email}", "WARN")
        return None

    # 保存到文件
    existing_accounts = []
    if Path(ACCOUNTS_FILE).exists():
        try:
            with open(ACCOUNTS_FILE, 'r', encoding='utf-8') as f:
                existing_accounts = json.load(f)
        except:
            pass
    
    existing_accounts.append(account_data)
    
    with open(ACCOUNTS_FILE, 'w', encoding='utf-8') as f:
        json.dump(existing_accounts, f, indent=2, ensure_ascii=False)
    
    print_log(f"已保存 → {ACCOUNTS_FILE}", "OK")
    return account_data


def fast_type(element, text, delay=0.02):
    """快速输入文本"""
    for c in text:
        element.send_keys(c)
        time.sleep(delay)


def register_single_account(driver, executor):
    """注册单个账号 (来自 app.py 的简洁版本)"""
    start_time = time.time()
    email = get_email()
    if not email:
        return None, False, None, 0

    wait = WebDriverWait(driver, 30)

    try:
        # 1. 访问登录页
        driver.get(LOGIN_URL)
        
        # 检测空白页
        time.sleep(2)
        page_source = driver.page_source
        if len(page_source) < 500 or "about:blank" in driver.current_url:
            raise Exception("页面加载空白,需要重启浏览器")

        # 2. 输入邮箱
        print_log("输入邮箱...")
        inp = wait.until(EC.element_to_be_clickable((By.XPATH, XPATH["email_input"])))
        inp.click()
        inp.clear()
        fast_type(inp, email)
        
        # 验证邮箱是否成功输入
        time.sleep(0.3)
        actual_value = inp.get_attribute("value")
        if actual_value != email:
            print_log(f"输入验证失败,清空后重新输入...", "WARN")
            # 清空后用 JS 输入
            driver.execute_script("arguments[0].value = '';", inp)
            time.sleep(0.1)
            driver.execute_script("arguments[0].value = arguments[1];", inp, email)
            # 触发 input 事件
            driver.execute_script("""
                var event = new Event('input', { bubbles: true });
                arguments[0].dispatchEvent(event);
            """, inp)
            time.sleep(0.3)
        
        print_log(f"邮箱 → {email}", "OK")

        # 3. 点击继续
        time.sleep(0.5)
        btn = wait.until(EC.element_to_be_clickable((By.XPATH, XPATH["continue_btn"])))
        driver.execute_script("arguments[0].click();", btn)
        print_log("继续下一步", "OK")

        # 异步预创建下一个邮箱
        executor.submit(prefetch_email)

        # 4. 获取验证码
        time.sleep(2)
        code = fetch_verification_code(email)
        if not code:
            return email, False, None, time.time() - start_time

        # 5. 输入验证码
        time.sleep(1)
        print_log(f"输入验证码 → {code}")
        try:
            pin = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pinInput']")))
            pin.click()
            time.sleep(0.1)
            fast_type(pin, code, 0.05)
        except:
            try:
                span = driver.find_element(By.CSS_SELECTOR, "span[data-index='0']")
                span.click()
                time.sleep(0.2)
                driver.switch_to.active_element.send_keys(code)
            except Exception as e:
                print_log(f"验证码输入失败: {e}", "ERROR")
                return email, False, None, time.time() - start_time

        # 6. 点击验证
        time.sleep(0.5)
        try:
            vbtn = driver.find_element(By.XPATH, XPATH["verify_btn"])
            driver.execute_script("arguments[0].click();", vbtn)
        except:
            for btn in driver.find_elements(By.TAG_NAME, "button"):
                if '验证' in btn.text:
                    driver.execute_script("arguments[0].click();", btn)
                    break
        print_log("提交验证", "OK")

        # 7. 输入姓名
        print_log("等待姓名输入...")
        selectors = [
            "input[formcontrolname='fullName']",
            "input[placeholder='全名']",
            "input[placeholder='Full name']",
            "input#mat-input-0",
        ]
        name_inp = None

        # 轮询检测姓名输入框
        for _ in range(30):
            for sel in selectors:
                try:
                    name_inp = driver.find_element(By.CSS_SELECTOR, sel)
                    if name_inp.is_displayed():
                        break
                except:
                    continue
            if name_inp and name_inp.is_displayed():
                break
            time.sleep(1)

        if name_inp and name_inp.is_displayed():
            name = random.choice(NAMES)
            name_inp.click()
            time.sleep(0.2)
            name_inp.clear()
            fast_type(name_inp, name)
            print_log(f"姓名 → {name}", "OK")
            time.sleep(0.3)
            name_inp.send_keys(Keys.ENTER)
            time.sleep(1)
        else:
            print_log("未找到姓名输入框", "ERROR")
            return email, False, None, time.time() - start_time

        # 8. 等待进入工作台
        print_log("等待工作台...")
        for _ in range(30):
            time.sleep(1)
            url = driver.current_url
            if 'business.gemini.google' in url and '/cid/' in url:
                print_log("工作台加载完成", "OK")
                break
        else:
            print_log(f"未跳转到工作台 → {driver.current_url}", "WARN")

        # 9. 保存配置
        elapsed = time.time() - start_time
        config = save_account_config(email, driver)
        if config:
            print_log(f"注册成功 → {email} (耗时 {elapsed:.1f}s)", "OK")
            return email, True, config, elapsed
        return email, False, None, elapsed

    except Exception as e:
        print_log(f"注册异常: {e}", "ERROR")
        log_error(email, str(e))
        return email, False, None, time.time() - start_time


# ==================== 账号上传 ====================
class AccountUploader:
    """账号上传管理类"""
    
    def __init__(self, api_host, admin_key):
        self.api_host = api_host.rstrip('/')
        self.admin_key = admin_key
        self.session = requests.Session()
        
    def login(self):
        """登录到服务器"""
        print_log("连接服务器中...")
        login_url = f"{self.api_host}/login"
        
        try:
            response = self.session.post(
                login_url,
                data={"admin_key": self.admin_key},
                allow_redirects=True,
                timeout=30
            )
            
            if len(self.session.cookies) > 0:
                print_log("服务器连接成功", "OK")
                return True
            
            if response.status_code == 200 and '登录' in response.text:
                print_log("密钥验证失败", "ERROR")
                return False
            
            print_log("服务器连接失败", "ERROR")
            return False
                
        except Exception as e:
            print_log(f"连接异常: {e}", "ERROR")
            return False
    
    def upload_and_replace(self, file_path):
        """覆盖上传账号配置"""
        if not Path(file_path).exists():
            print_log(f"文件不存在 → {file_path}", "ERROR")
            return False
        
        print_log(f"读取本地文件 → {file_path}")
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                accounts_data = json.load(f)
        except Exception as e:
            print_log(f"文件读取异常: {e}", "ERROR")
            return False
        
        print_log(f"本地账号 → {len(accounts_data)} 个")
        print_log("开始上传...")
        
        upload_url = f"{self.api_host}/accounts-config"
        
        try:
            response = self.session.put(
                upload_url,
                json=accounts_data,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                print_log("上传完成!", "OK")
                print_log(f"{result.get('message', '配置已更新')}")
                print_log(f"服务器账号 → {result.get('account_count', len(accounts_data))} 个")
                
                print()
                print_separator()
                print_log("正在获取服务器账号状态...")
                print_separator()
                self.view_accounts()
                
                return True
            else:
                print_log(f"上传失败,状态码: {response.status_code}", "ERROR")
                return False
                
        except Exception as e:
            print_log(f"上传异常: {e}", "ERROR")
            return False
    
    def upload_and_merge(self, file_path):
        """合并上传账号配置(保留远程正常账号)"""
        print_log("智能合并模式启动...")
        
        # 读取本地账号
        if not Path(file_path).exists():
            print_log(f"本地文件缺失 → {file_path}", "ERROR")
            return False
        
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                local_accounts = json.load(f)
            print_log(f"本地账号 → {len(local_accounts)} 个")
        except Exception as e:
            print_log(f"读取本地文件失败: {e}", "ERROR")
            return False
        
        # 获取远程账号配置
        print_log("获取远程配置...")
        config_url = f"{self.api_host}/accounts-config"
        
        try:
            response = self.session.get(config_url, timeout=30)
            if response.status_code == 200:
                remote_config = response.json()
                remote_accounts = remote_config.get('accounts', [])
                print_log(f"远程账号 → {len(remote_accounts)} 个")
            else:
                print_log("远程配置获取失败,仅上传本地", "WARN")
                remote_accounts = []
        except Exception as e:
            print_log(f"远程连接异常: {e},仅上传本地", "WARN")
            remote_accounts = []
        
        # 筛选远程正常账号(未过期、未禁用)
        valid_remote_accounts = []
        for account in remote_accounts:
            if account.get('disabled', False):
                continue
            
            expires_at = account.get('expires_at')
            if expires_at and expires_at != '未设置':
                try:
                    beijing_tz = timezone(timedelta(hours=8))
                    expire_time = datetime.strptime(expires_at, "%Y-%m-%d %H:%M:%S")
                    expire_time = expire_time.replace(tzinfo=beijing_tz)
                    current_time = datetime.now(beijing_tz)
                    if expire_time <= current_time:
                        continue
                except:
                    pass
            
            valid_remote_accounts.append(account)
        
        print_log(f"有效远程账号 → {len(valid_remote_accounts)} 个")
        
        # 合并账号(去重)
        merged_accounts = list(valid_remote_accounts)
        remote_ids = {acc.get('id') for acc in valid_remote_accounts}
        
        new_count = 0
        for local_account in local_accounts:
            local_id = local_account.get('id')
            if local_id not in remote_ids:
                merged_accounts.append(local_account)
                new_count += 1
        
        print_log(f"合并结果 → 保留 {len(valid_remote_accounts)} 个,新增 {new_count} 个,共 {len(merged_accounts)} 个")
        
        # 上传合并后的配置
        print_log("上传合并配置...")
        upload_url = f"{self.api_host}/accounts-config"
        
        try:
            response = self.session.put(
                upload_url,
                json=merged_accounts,
                timeout=30
            )
            
            if response.status_code == 200:
                result = response.json()
                print_log("合并上传完成!", "OK")
                print_log(f"{result.get('message', '配置已更新')}")
                print_log(f"服务器账号 → {result.get('account_count', len(merged_accounts))} 个")
                
                print()
                print_separator()
                print_log("正在获取服务器账号状态...")
                print_separator()
                self.view_accounts()
                
                return True
            else:
                print_log(f"上传失败,状态码: {response.status_code}", "ERROR")
                return False
                
        except Exception as e:
            print_log(f"上传异常: {e}", "ERROR")
            return False
    
    def view_accounts(self):
        """查看远程账号状态"""
        print_log("查询远程账号...")
        
        view_url = f"{self.api_host}/accounts"
        
        try:
            response = self.session.get(view_url, timeout=30)
            
            if response.status_code == 200:
                data = response.json()
                accounts = data.get('accounts', [])
                total = data.get('total', len(accounts))
                
                if not accounts:
                    print_log("远程无账号配置", "INFO")
                    return True
                
                print(f"\n共 {total} 个账号")
                print_separator("=", 120)
                
                # 表头
                print(f"{'序号':<6} {'账号ID':<35} {'状态':<12} {'过期时间':<22} {'剩余时长':<15} {'累计对话':<10}")
                print_separator("-", 120)
                
                # 账号列表
                for i, account in enumerate(accounts, 1):
                    acc_id = account.get('id', 'N/A')
                    status = account.get('status', 'N/A')
                    expires_at = account.get('expires_at', '未设置')
                    remaining = account.get('remaining_display', 'N/A')
                    conversations = account.get('conversation_count', 0)
                    
                    if len(acc_id) > 33:
                        acc_id = acc_id[:30] + "..."
                    
                    print(f"{i:<6} {acc_id:<35} {status:<12} {expires_at:<22} {remaining:<15} {conversations:<10}")
                
                print_separator("=", 120)
                return True
            else:
                print_log(f"查询失败 → 状态码 {response.status_code}", "ERROR")
                return False
                
        except Exception as e:
            print_log(f"查询异常: {e}", "ERROR")
            return False


# ==================== 主程序流程 ====================
def run_batch_registration(target_count):
    """批量注册账号 (保底成功数模式)"""
    print()
    print_separator()
    print(f"目标: 成功注册 {target_count} 个账号")
    print_separator()
    print()
    
    # 清空旧文件
    if Path(ACCOUNTS_FILE).exists():
        Path(ACCOUNTS_FILE).unlink()
        print_log(f"已清空 → {ACCOUNTS_FILE}")
    
    driver = None
    executor = ThreadPoolExecutor(max_workers=2)
    success_count = 0
    fail_count = 0
    attempt_count = 0
    total_time = 0
    success_times = []

    # 预创建第一个邮箱
    executor.submit(prefetch_email)
    
    # 连续失败计数器(用于保护机制)
    consecutive_fails = 0
    MAX_CONSECUTIVE_FAILS = 20

    # 循环直到成功数达到目标
    while success_count < target_count:
        # 检查全局停止标志
        global STOP_FLAG
        if STOP_FLAG:
            print_log("收到停止信号,中止任务", "WARN")
            STOP_FLAG = False  # 重置标志
            break
        
        # 连续失败保护
        if consecutive_fails >= MAX_CONSECUTIVE_FAILS:
            print_log(f"连续失败 {MAX_CONSECUTIVE_FAILS} 次,中止本轮任务", "ERROR")
            break
        
        attempt_count += 1
        current_target = target_count + fail_count  # 动态调整显示的总数
        
        print()
        print_separator("#", 60)
        print(f"正在注册第 {attempt_count} 个账号 (成功: {success_count}/{target_count})")
        print_separator("#", 60)
        print()

        # 确保浏览器可用
        if driver is None:
            options = uc.ChromeOptions()
            if HEADLESS_MODE:
                print_log("启动无头浏览器...")
                options.add_argument("--headless=new")
                options.add_argument("--disable-gpu")
                options.add_argument("--no-sandbox")
                options.add_argument("--window-size=1200,800")
            else:
                print_log("启动浏览器...")
            driver = uc.Chrome(options=options, use_subprocess=True)
            if not HEADLESS_MODE:
                driver.set_window_size(100, 200)
                driver.set_window_position(50, 50)
            time.sleep(1)
        else:
            try:
                _ = driver.current_url
            except:
                print_log("浏览器已关闭,重启中...")
                try: 
                    driver.quit()
                except: 
                    pass
                options = uc.ChromeOptions()
                if HEADLESS_MODE:
                    options.add_argument("--headless=new")
                    options.add_argument("--disable-gpu")
                    options.add_argument("--no-sandbox")
                    options.add_argument("--window-size=1200,800")
                driver = uc.Chrome(options=options, use_subprocess=True)
                if not HEADLESS_MODE:
                    driver.set_window_size(100, 200)
                    driver.set_window_position(50, 50)
                time.sleep(1)

        try:
            email, success, config, elapsed = register_single_account(driver, executor)
            total_time += elapsed
            
            if success and config:
                success_count += 1
                success_times.append(elapsed)
                consecutive_fails = 0  # 重置连续失败计数
                print_log(f"进度: {success_count}/{target_count} 完成", "OK")
            else:
                fail_count += 1
                consecutive_fails += 1
                print_log(f"失败 +1 (连续失败: {consecutive_fails}/{MAX_CONSECUTIVE_FAILS})", "WARN")
                
        except Exception as e:
            error_msg = str(e).lower()
            print_log(f"注册异常: {e}", "ERROR")
            fail_count += 1
            consecutive_fails += 1
            
            # 检测空白页或页面加载问题
            if "blank" in error_msg or "timeout" in error_msg or "element" in error_msg:
                print_log("检测到页面异常,重启浏览器...", "WARN")
                if driver:
                    try: 
                        driver.quit()
                    except: 
                        pass
                    driver = None
            elif driver:
                try: 
                    driver.quit()
                except: 
                    pass
                driver = None

        avg_time = total_time / attempt_count if total_time > 0 else 0
        print_progress(success_count, target_count, success_count, fail_count, avg_time)

        if success_count < target_count and driver:
            try:
                driver.delete_all_cookies()
            except:
                pass
            time.sleep(random.randint(2, 3))

    executor.shutdown(wait=False)
    if driver:
        try: 
            driver.quit()
        except: 
            pass
        
        # Monkeypatch: 防止 __del__ 再次调用 quit 导致 WinError 6
        try:
            driver.quit = lambda: None
        except:
            pass
            
        driver = None

    # 统计信息
    avg_time = sum(success_times) / len(success_times) if success_times else 0
    min_time = min(success_times) if success_times else 0
    max_time = max(success_times) if success_times else 0
    
    print()
    print_separator()
    print(f"注册完成! 目标: {target_count}, 成功: {success_count}, 失败: {fail_count}, 总尝试: {attempt_count}")
    print(f"总耗时: {total_time:.1f}s | 平均: {avg_time:.1f}s | 最快: {min_time:.1f}s | 最慢: {max_time:.1f}s")
    print(f"账号已保存至: {ACCOUNTS_FILE}")
    print_separator()
    
    return {
        "success": success_count,
        "fail": fail_count,
        "attempts": attempt_count,
        "avg_time": avg_time,
        "success_times": success_times,
        "is_ok": success_count > 0
    }


def handle_task_execution(count, upload_mode, uploader):
    """执行一次完整的任务(注册+上传)"""
    stats = run_batch_registration(count)
    
    if stats.get('is_ok'):
        print()
        # 先登录再上传
        if not uploader.login():
            print_log("服务器登录失败,无法上传", "ERROR")
            return stats
        
        if upload_mode == 'replace':
            print_log("开始覆盖上传到服务器...")
            uploader.upload_and_replace(ACCOUNTS_FILE)
        elif upload_mode == 'merge':
            print_log("开始合并上传到服务器...")
            uploader.upload_and_merge(ACCOUNTS_FILE)
    else:
        print_log("注册流程未成功,取消上传", "WARN")
        
    return stats


def main():
    """主程序入口"""
    print_separator()
    print("Gemini Business 自动注册上传工具")
    print_separator()
    print()
    
    uploader = AccountUploader(API_HOST, ADMIN_KEY)
    
    # 登录服务器
    if not uploader.login():
        print_log("登录失败,无法继续", "ERROR")
        input("\n按回车键退出...")
        sys.exit(1)
    
    print()
    
    while True:
        print("\n请选择操作:")
        print("  1. 注册上传")
        print("  2. 查看远程账号状态")
        print("  3. 退出")
        print()
        
        choice = input("请输入选项 (1-3): ").strip()
        
        if choice == "1":
            # 确定上传模式
            upload_mode = 'merge'
            
            # 1. 询问数量
            count_str = input("\n请输入注册数量 (默认 5): ").strip()
            count = int(count_str) if count_str else 5
            
            # 2. 询问执行模式
            print("\n请选择执行模式:")
            print("  1. 立即执行一次")
            print("  2. 定时循环执行 (支持自定义间隔)")
            mode_choice = input("请输入选项 (1-2): ").strip()
            
            if mode_choice == "2":
                # 定时模式
                hours_str = input("\n请输入循环间隔小时 (默认 12): ").strip()
                try:
                    interval_hours = float(hours_str) if hours_str else 12.0
                except:
                    print_log("输入无效,使用默认值 12 小时", "WARN")
                    interval_hours = 12.0
                
                print(f"\n已选择: 定时循环模式 (间隔 {interval_hours} 小时)")
                run_now_str = input("是否在开始循环前立即运行一次? (y/n, 默认 y): ").strip().lower()
                run_now = run_now_str != 'n'
                
                print_log("定时任务已启动! 按 Ctrl+C 可随时停止", "INFO")
                
                loop_count = 0
                while True:
                    loop_count += 1
                    
                    if run_now or loop_count > 1:
                        print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] >>> 开始第 {loop_count} 次循环任务")
                        handle_task_execution(count, upload_mode, uploader)
                        print_log(f"第 {loop_count} 次任务完成", "INFO")
                    else:
                        print_log("跳过首次运行,直接进入等待", "INFO")

                    # 计算下次运行时间
                    next_run = datetime.now() + timedelta(hours=interval_hours)
                    print_log(f"下一次任务将在 {next_run.strftime('%Y-%m-%d %H:%M:%S')} 开始", "INFO")
                    
                    # 倒计时等待
                    total_seconds = int(interval_hours * 3600)
                    try:
                        while total_seconds > 0:
                            # 每分钟更新一次状态,显示剩余时间
                            if total_seconds % 60 == 0:
                                pass 
                            time.sleep(1)
                            total_seconds -= 1
                    except KeyboardInterrupt:
                        print("\n")
                        print_log("检测到中断, 停止定时任务", "WARN")
                        break
                        
            else:
                # 立即执行模式 (默认)
                print()
                handle_task_execution(count, upload_mode, uploader)
                
        elif choice == "2":
            print()
            uploader.view_accounts()
            
        elif choice == "3":
            print("\n再见!")
            break
            
        else:
            print_log("无效选项,请重试", "WARN")
        
        print()


if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n")
        print_log("用户中断程序", "INFO")
    except Exception as e:
        print_log(f"程序异常: {e}", "ERROR")
        input("\n按回车键退出...")


📌 转载信息
原作者:
zding
转载时间:
2026/1/14 18:06:03

直接产出 Dreamy-rain/gemini-business2api 所需 json 注册效率~60s 一个
日抛 直接每日重新注册即可

刚刚那个版本有点问题 已修复
已经生成的佬记得把 config_id 后缀的?csesidx=.* 清理掉重新导入

import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from urllib.parse import urlparse, parse_qs
from datetime import datetime
import time, random, json, os, requests

# 配置
TOTAL_ACCOUNTS = 20
MAIL_API = "https://mail.chatgpt.org.uk"
MAIL_KEY = "gpt-test"
OUTPUT_DIR = "gemini_accounts"
LOGIN_URL = "https://auth.business.gemini.google/login?continueUrl=https:%2F%2Fbusiness.gemini.google%2F&wiffid=CAoSJDIwNTlhYzBjLTVlMmMtNGUxZC1hY2JkLThmOGY2ZDE0ODM1Mg" # XPath
XPATH = {
    "email_input": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[1]/div[1]/div/span[2]/input",
    "continue_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/button",
    "verify_btn": "/html/body/c-wiz/div/div/div[1]/div/div/div/form/div[2]/div/div[1]/span/div[1]/button",
}

NAMES = ["James Smith", "John Johnson", "Robert Williams", "Michael Brown", "William Jones",
         "David Garcia", "Mary Miller", "Patricia Davis", "Jennifer Rodriguez", "Linda Martinez"]

def log(msg, level="INFO"): print(f"[{level}] {msg}")

def create_email():
    """创建临时邮箱""" try:
        r = requests.get(f"{MAIL_API}/api/generate-email",
            headers={"X-API-Key": MAIL_KEY}, timeout=30)
        if r.status_code == 200 and r.json().get('success'):
            email = r.json()['data']['email']
            log(f"邮箱创建: {email}")
            return email
    except Exception as e:
        log(f"创建邮箱失败: {e}", "ERR")
    return None def get_code(email, timeout=30):
    """获取验证码"""
    log(f"等待验证码 (最多{timeout}s)...")
    start = time.time()
    while time.time() - start < timeout:
        try:
            r = requests.get(f"{MAIL_API}/api/emails", params={"email": email},
                headers={"X-API-Key": MAIL_KEY}, timeout=30)
            if r.status_code == 200:
                emails = r.json().get('data', {}).get('emails', [])
                if emails:
                    html = emails[0].get('html_content') or emails[0].get('content', '')
                    soup = BeautifulSoup(html, 'html.parser')
                    span = soup.find('span', class_='verification-code')
                    if span:
                        code = span.get_text().strip()
                        if len(code) == 6:
                            log(f"验证码: {code}")
                            return code
        except: pass print(f"  等待中... ({int(time.time()-start)}s)", end='\r')
        time.sleep(3)
    log("验证码超时", "ERR")
    return None def save_config(email, cookies, url):
    """保存配置"""
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    parsed = urlparse(url)
    path_parts = url.split('/')
    config_id = None for i, p in enumerate(path_parts):
        if p == 'cid' and i+1 < len(path_parts):
            config_id = path_parts[i+1]
            # 清理 config_id 结尾的 ?csesidx=xxx if config_id and '?' in config_id:
                config_id = config_id.split('?')[0]
            break

    cookie_dict = {c['name']: c for c in cookies}
    ses_cookie = cookie_dict.get('__Secure-C_SES', {})

    data = {
        "id": email,
        "csesidx": parse_qs(parsed.query).get('csesidx', [None])[0],
        "config_id": config_id,
        "secure_c_ses": ses_cookie.get('value'),
        "host_c_oses": cookie_dict.get('__Host-C_OSES', {}).get('value'),
        "expires_at": datetime.fromtimestamp(ses_cookie.get('expiry', 0) - 43200).strftime('%Y-%m-%d %H:%M:%S') if ses_cookie.get('expiry') else None
    }

    with open(f"{OUTPUT_DIR}/{email}.json", 'w') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    log(f"配置已保存: {email}.json")
    return data

def register(driver):
    """注册单个账号"""
    email = create_email()
    if not email: return None, False, None

    wait = WebDriverWait(driver, 60)

    # 1. 访问登录页
    driver.get(LOGIN_URL)
    time.sleep(5)

    # 2. 输入邮箱
    log("输入邮箱...")
    inp = wait.until(EC.visibility_of_element_located((By.XPATH, XPATH["email_input"])))
    inp.click(); time.sleep(0.3); inp.clear(); time.sleep(0.3)
    for c in email: inp.send_keys(c); time.sleep(0.05)
    log(f"邮箱: {email}, 实际值: {inp.get_attribute('value')}")
    time.sleep(1)

    # 3. 点击继续
    btn = wait.until(EC.element_to_be_clickable((By.XPATH, XPATH["continue_btn"])))
    driver.execute_script("arguments[0].click();", btn)
    log("点击继续")
    time.sleep(3)

    # 4. 获取验证码
    code = get_code(email)
    if not code: return email, False, None # 5. 输入验证码
    time.sleep(2)
    log(f"输入验证码: {code}")
    try:
        pin = wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='pinInput']")))
        pin.click(); time.sleep(0.2)
        for c in code: pin.send_keys(c); time.sleep(0.1)
    except:
        try:
            span = driver.find_element(By.CSS_SELECTOR, "span[data-index='0']")
            span.click(); time.sleep(0.3)
            driver.switch_to.active_element.send_keys(code)
        except Exception as e:
            log(f"验证码输入失败: {e}", "ERR")
            return email, False, None # 6. 点击验证
    time.sleep(1)
    try:
        vbtn = driver.find_element(By.XPATH, XPATH["verify_btn"])
        driver.execute_script("arguments[0].click();", vbtn)
    except:
        for btn in driver.find_elements(By.TAG_NAME, "button"):
            if '验证' in btn.text: driver.execute_script("arguments[0].click();", btn); break
    log("点击验证")
    time.sleep(5)

    # 7. 输入姓名 try:
        name_inp = WebDriverWait(driver, 30).until(EC.visibility_of_element_located(
            (By.CSS_SELECTOR, "input[formcontrolname='fullName'], input[placeholder='全名'], input#mat-input-0")))
        name = random.choice(NAMES)
        name_inp.clear(); time.sleep(0.3)
        for c in name: name_inp.send_keys(c); time.sleep(0.03)
        log(f"姓名: {name}")
        from selenium.webdriver.common.keys import Keys
        name_inp.send_keys(Keys.ENTER)
    except Exception as e:
        log(f"姓名输入异常: {e}", "WARN")

    # 8. 等待进入工作台
    log("等待工作台...")
    time.sleep(6)
    for _ in range(30):
        if 'business.gemini.google' in driver.current_url and 'auth' not in driver.current_url:
            break
        time.sleep(2)
    time.sleep(3)

    # 9. 保存配置
    config = save_config(email, driver.get_cookies(), driver.current_url)
    log(f"注册成功: {email}")
    return email, True, config

def main():
    print(f"\n{'='*50}\nGemini Business 批量注册 - 共 {TOTAL_ACCOUNTS} 个\n{'='*50}\n")

    driver = uc.Chrome(options=uc.ChromeOptions(), use_subprocess=True)
    success, fail, accounts = 0, 0, []

    for i in range(TOTAL_ACCOUNTS):
        print(f"\n{'#'*40}\n注册 {i+1}/{TOTAL_ACCOUNTS}\n{'#'*40}\n")

        try:
            driver.current_url  # 检查driver是否有效 except:
            driver = uc.Chrome(options=uc.ChromeOptions(), use_subprocess=True)

        try:
            email, ok, cfg = register(driver)
            if ok: success += 1; accounts.append((email, cfg))
            else: fail += 1 except Exception as e:
            log(f"异常: {e}", "ERR"); fail += 1 try: driver.quit()
            except: pass
            driver = uc.Chrome(options=uc.ChromeOptions(), use_subprocess=True)

        print(f"\n进度: {i+1}/{TOTAL_ACCOUNTS} | 成功: {success} | 失败: {fail}")

        if i < TOTAL_ACCOUNTS - 1:
            try: driver.delete_all_cookies()
            except: pass
            time.sleep(random.randint(3, 5))

    try: driver.quit()
    except: pass print(f"\n{'='*50}\n完成! 成功: {success}, 失败: {fail}\n配置保存在: {OUTPUT_DIR}/\n{'='*50}")

if __name__ == "__main__":
    main()


感谢佬友的临时邮箱 gptmail 也感谢谷大善人不限域名大赦天下

特别感谢 2api 作者 大家也别忘了给他的帖子和仓库 star 一下~


📌 转载信息
原作者:
SnapSheep
转载时间:
2026/1/12 10:33:02

1. 这是什么?

这是一个带中英文的,使用拳头 API 和 blitz 网站数据的,抓取英雄联盟海克斯大乱斗所有英雄,出装,分级的脚本,和一些能减少选择困难的方法
本来是之前朋友叫我玩海克斯大乱斗,我回归了一下,发现挺好玩
但是搞不清符文,也不会出装备…
然后就一直想着做一个工具推荐出装备和海克斯
然后玩的太入迷了,现在已经不需要推荐了… 工具也不弄了
这里把抓数据的方法分享给大家
需要用的可以拿走,界面啥的就不发了,自用的太二了

2. 能做什么?

首先最简单的是直接参考这个 ARAM Mayhem Tier List 网站就 OK 了
千万不要下载任何 APP, 功能都很美好,但是都不支持国服,我试过了

  1. 自己弄个小程序或者网页,或者啥的,适合回归玩海克斯,但还没多少时间玩和研究的
  2. 可以用 python 批量处理为 json, 一键导入英雄联盟 (国服) 客户端内,这样游戏里就直接有推荐了
  3. 继续调试,摸鱼…

3. 文件说明 (文件打包放最后了):

  • blitz_aram_scraper.py

    1. 首选调用 Riot 的 API, 获取获取英雄的中英文名字 (主要为了解决 blitz 有时候强制中文失效)
    2. 访问 ARAM Mayhem Tier List , 在这里抓所有英雄的列表,名字,头像图片,详情链接
    3. 访问每个英雄具体的详情链接,获取该英雄的全部海克斯符文分级,装备流派分级等等数据
      因为需要点击加载全部,和切换流派,所以使用 selenium
    4. 合并为 aram_data.csv, 这里其实应该设计多表结构的,但是为了省事,就大杂烩了
    5. 其中会多线程抓取所有数据,然后图片会保存本地 (下载一次)
  • aram_data.csv -- 抓取后的所有数据

  • 技术文档_ARAM 数据结构与逻辑说明.md

    –AI 生成的数据 (aram_data) 使用说明

  • downloaded_images 文件夹

    放所有下载的英雄头像、装备图标、符文图片

  • 对照表获取.py

    拳头 API 取英雄数据的,中英文英雄名字功能融合到 blitz_aram_scraper.py 了
    单独运行会获取所有中英文物品英雄天赋数据对照表 (没有海克斯)

4. 文件打包

归档.zip

5. 问题??这个网站查隐藏分到底真的假的?非广告

网站地址,非广告!!
海克斯大乱斗数据抓取 (外服)2

6. 海克斯大乱斗通用出装 JSON (来自抖音,非推广,来自抖音一个作者,大家也可以自己制作)

文件来源!!!


抖音作者:1- 长按复制此条消息,打开抖音搜索,查看 TA 的更多作品。 https://v.douyin.com/gjLVmID0kmc/ 4@3.com :8pm
抖音号:69672512878


通用出装 json.zip

使用说明:

藏品 — 装备 — 导入


7.Tip:

  1. 为什么选择 blitz?
    因为他竟然有中英文,之前翻了一大圈网站,根本找不到全面的英雄联盟装备,符文,英雄的中英文对照 (拳头的 API 没有海克斯的中英文,也可能是我没找到?)
  2. 为什么没有国服的数据?
    因为我们有掌盟
    我看有一些小程序有国服数据,但是不知道真假,有了解的佬可以告知一下

📌 转载信息
转载时间:
2026/1/8 17:47:01

声明:本资料仅供学习交流,严禁使用于商业、非法用途!!!

之前不是发了一个备案查询的脚本嘛,详情可见[bspost cid="3384"]

有师傅反应说接口数据不太对,因为那个接口用的都是缓存库,确实太老了,新的得vip(手动狗头)

根据我的需求确实能满足,以下改的就纯单学习交流了,获取的还是比之前要全很多

另外师傅们注意,还是根据需求来写的,所以仅限根据企业名称获取备案功能!!!

还有,没去细扣js了,直接用的selenium,师傅们如果使用可能还得去装个驱动,这个直接上网搜就行,毕竟在工作(摸鱼)时间写的,发现还挺好用的,对token验证还是乱杀,也学习了一波

直接上代码吧

import re
import time
import random
import csv
from urllib import parse
from urllib.parse import quote
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from pathlib import Path
import warnings
#忽略warning
warnings.filterwarnings("ignore")

# 定义备案爬虫类
class BeianSpider(object): 
    # 获取url
    def __init__(self):
        #判断beian.csv是否存在,不存在则创建
        my_file = Path("beian.csv")
        if my_file.exists():
            print("[+]beian.csv已存在!")
        else:
            with open('beian.csv','a',newline='',encoding="utf-8") as f:
                #生成csv操作对象
                writer = csv.writer(f)
                header = ["主办单位名称","单位性质","网站备案/许可证号","网站名称","网站首页网址","审核时间","记录时间","备案域名"]
                writer.writerow(header)
                print("[+]beian.csv已创建!")

    def get_url(self,word):
        url = 'https://icp.chinaz.com/record/{}'
        
        name=quote(word, 'utf-8')
        #params = parse.urlencode(name)
        url = url.format(name)
        #print(url)
        return url
    
    # 正则行数,提取内容
    def parse_html(self,name,html):
        # 正则表达式
        re_bds = '<td>(.*?)</td><td class="tc">(.*?)</td><td>(.*?)</td><td>(.*?)</td><td class="Now"><span><a href="//(.*?)".*?<td class="tc">(.*?)</td><td class="tc">(.*?)</td><td class="tc"><a href="/record/(.*?)".*?'
        # 生成正则表达式对象
        pattern = re.compile(re_bds,re.S)
        r_list = pattern.findall(html)
        print(r_list)
        self.save_html(r_list)
        with open('success.txt','a+',newline='',encoding="utf-8") as f:
            f.write(name)
            f.write('\n')

    # 保存数据函数,使用python内置csv模块
    def save_html(self,r_list):
        #生成文件对象  
        with open('beian.csv','a',newline='',encoding="utf-8") as f:
            #生成csv操作对象
            writer = csv.writer(f)
            #整理数据
            lenth = len(r_list)
            #print(lenth)
            for i in range(lenth):
                #主办单位名称
                save_organizer_name = r_list[i][0]
                #print(name)
                #单位性质
                save_unit_nature = r_list[i][1]
                #网站备案/许可证号
                save_ICP_number = r_list[i][2]
                #网站名称
                save_website_name = r_list[i][3]
                #网站首页网址
                save_website_index = r_list[i][4]
                #审核时间
                save_review_time = r_list[i][5]
                #记录时间
                save_record_time = r_list[i][6]
                #备案域名
                save_ICP_domain = r_list[i][7]
                L = [save_organizer_name,save_unit_nature,save_ICP_number,save_website_name,save_website_index,save_review_time,save_record_time,save_ICP_domain]
                # 写入csv文件
                writer.writerow(L)
            print("[+]",r_list[0][0],"查询写入完成")
                    

    # 主函数
    def run(self):
        try:
            url = "http://icp.chinaz.com/record"
            #禁止浏览器窗口弹出
            chrome_options = webdriver.ChromeOptions()
            chrome_options.add_argument('--headless')
            chrome_options.add_argument('--disable-gpu')
            browser = webdriver.Chrome(chrome_options=chrome_options)
            #browser = webdriver.Chrome()
            with open('qiye.txt','r',newline='',encoding="utf-8") as f:
                data = f.read().splitlines()
                lenth = len(data)
                for i in range(lenth):
                    #print(data[i])
                    url = self.get_url(data[i])
                    browser.get(url)
                    try:
                        #设置显示等待时间及标签,此处可改,实测受网络波动影响
                        wait = WebDriverWait(browser, 5)
                        wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'bg-list')))
                    #显示等待没有找到标签会报错,此处写入faild.txt后跳到下一个查找
                    except Exception as e:
                        with open('faild.txt','a+',newline='',encoding="utf-8") as f:
                            f.write(data[i])
                            f.write('\n')
                            print("[-]",data[i],"无备案信息")
                        continue
                    html = browser.page_source
                    self.parse_html(data[i],html)
                    #随机延时,根据需求设置
                    #time.sleep(random.uniform(1,2))
            print("[+]","All Finished")
        #except Exception:
    
        finally:
            browser.close()

# 以脚本方式启动
if __name__ == '__main__':
    #捕捉全局异常错误
    try:
        spider = BeianSpider()
        spider.run()
    except Exception as e:
        print("错误:",e)

使用还是跟之前一样,在qiye.txt里写上所有企业名称,一行一个,直接python3运行beian.py就好,然后脚本跑起来会输出到beian.csv里,同时输出success和faild的企业名,这样没找到备案的企业还能找其他站捞一波,以网上随便找的全省医院为例,结果见下图




实测还是获取的信息多一点,没vip只能查一页,所以就只获取一页了

而且实测跟网速还是有挺大关系,师傅们可以看看代码微调下参数,等待啥的

另外祝师傅们七夕快乐啊~~

有的人七夕有花有爱有对象,有的人居然从六月一直加班到现在,需坚强