闲着也是闲着,让 Gemini 和 Claude 糊的

代码:

import json
import sys
import re
import os
import argparse
import asyncio
import httpx
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, Tuple
 
 
class ConversionError(Exception):
    """转换错误"""
    pass
 
 
def extract_email_from_filename(filename: str) -> Optional[str]:
    """从文件名提取邮箱(作为 fallback)"""
    # 移除扩展名
    name = filename[:-5] if filename.lower().endswith(".json") else filename
    
    # 匹配邮箱
    email_pattern = r"([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})"
    match = re.search(email_pattern, name)
    
    return match.group(1) if match else None
 
 
async def refresh_token_and_get_info(
    client_id: str, 
    client_secret: str, 
    refresh_token: str
) -> Tuple[str, str, int]:
    """
    刷新 token 并获取新的 access_token、expiry 和 expires_in
    
    Returns:
        (access_token, expiry_iso_string, expires_in_seconds)
    """
    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            resp = await client.post(
                "https://oauth2.googleapis.com/token",
                data={
                    "client_id": client_id,
                    "client_secret": client_secret,
                    "refresh_token": refresh_token,
                    "grant_type": "refresh_token",
                },
                headers={"Content-Type": "application/x-www-form-urlencoded"}
            )
            resp.raise_for_status()
            token_data = resp.json()
            
            new_access_token = token_data["access_token"]
            expires_in = int(token_data.get("expires_in", 3599))
            
            # 计算 expiry(ISO 格式)
            current_utc = datetime.now(timezone.utc)
            expires_at = current_utc + timedelta(seconds=expires_in)
            expiry = expires_at.isoformat()
            
            return new_access_token, expiry, expires_in
            
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 400:
            raise ConversionError(f"刷新 token 失败(可能 refresh_token 已失效): {e.response.text}")
        raise ConversionError(f"刷新 token 失败 (HTTP {e.response.status_code}): {e.response.text}")
    except httpx.HTTPError as e:
        raise ConversionError(f"刷新 token 网络错误: {e}")
 
 
def is_token_expired(expiry: Optional[str]) -> bool:
    """检查 token 是否过期"""
    if not expiry:
        return True
    
    try:
        # 解析 expiry 时间
        expiry_str = expiry.replace("Z", "+00:00")
        expiry_dt = datetime.fromisoformat(expiry_str)
        
        # 如果没有时区信息,假设为 UTC
        if expiry_dt.tzinfo is None:
            expiry_dt = expiry_dt.replace(tzinfo=timezone.utc)
        
        # 提前 60 秒判定过期(安全边界)
        now = datetime.now(timezone.utc)
        return expiry_dt <= (now + timedelta(seconds=60))
        
    except Exception as e:
        # 解析失败,认为已过期
        return True
 
 
async def get_user_email(access_token: str) -> str:
    """调用 Google API 获取真实邮箱"""
    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            resp = await client.get(
                "https://www.googleapis.com/oauth2/v2/userinfo",
                headers={"Authorization": f"Bearer {access_token}"}
            )
            resp.raise_for_status()
            data = resp.json()
            email = data.get("email")
            if not email:
                raise ConversionError("API 响应中没有 email 字段")
            return email
    except httpx.HTTPStatusError as e:
        raise ConversionError(f"获取邮箱失败 (HTTP {e.response.status_code}): {e.response.text}")
    except httpx.HTTPError as e:
        raise ConversionError(f"获取邮箱网络错误: {e}")
 
 
async def fetch_project_id(access_token: str) -> Optional[str]:
    """通过 loadCodeAssist API 获取 project_id"""
    try:
        async with httpx.AsyncClient(timeout=30.0) as client:
            resp = await client.post(
                "https://generativelanguage.googleapis.com/v1alpha/models/code-gecko:loadCodeAssist",
                headers={
                    "Authorization": f"Bearer {access_token}",
                    "Content-Type": "application/json",
                },
                json={}
            )
            
            if resp.status_code == 200:
                data = resp.json()
                
                # 从响应中提取 project_id
                # 格式通常是 "projects/PROJECT_ID" 或直接是字段
                if "name" in data:
                    name = data["name"]
                    if "projects/" in name:
                        parts = name.split("projects/")
                        if len(parts) > 1:
                            project_id = parts[1].split("/")[0]
                            return project_id
                
                # 尝试其他字段
                for field in ["projectId", "project_id", "projectNumber"]:
                    if field in data:
                        return str(data[field])
            
            return None
            
    except Exception:
        return None
 
 
async def convert_file_async(input_path: str, output_dir: str, use_filename_email: bool = True):
    """异步转换单个文件"""
    filename = os.path.basename(input_path)
    print(f"\n处理文件: {filename}")
    
    try:
        with open(input_path, "r", encoding="utf-8") as f:
            data = json.load(f)
    except Exception as e:
        print(f"  [错误] 读取文件失败: {e}")
        return
 
    # 获取必要字段
    access_token = data.get("access_token") or data.get("token")
    client_id = data.get("client_id")
    client_secret = data.get("client_secret")
    refresh_token = data.get("refresh_token")
    
    if not all([client_id, client_secret, refresh_token]):
        print(f"  [错误] 缺少必要的 OAuth 字段 (client_id, client_secret, refresh_token)")
        return
 
    try:
        # 1. 先检查并刷新 token(如果需要)
        print(f"  [1/4] 检查 token 有效性...")
        expiry = data.get("expiry")
        expires_in = 3599  # 默认值
        
        if is_token_expired(expiry):
            print(f"  ⚠ Token 已过期或无效,正在刷新...")
            access_token, expiry, expires_in = await refresh_token_and_get_info(
                client_id, client_secret, refresh_token
            )
            print(f"  ✓ Token 已刷新,新过期时间: {expiry}")
        else:
            print(f"  ✓ Token 有效,过期时间: {expiry}")
            # 保留原 access_token 和 expiry
            if not access_token:
                # 如果没有 access_token,强制刷新
                print(f"  ⚠ 缺少 access_token,正在刷新...")
                access_token, expiry, expires_in = await refresh_token_and_get_info(
                    client_id, client_secret, refresh_token
                )
                print(f"  ✓ Token 已刷新")
 
        # 2. 获取邮箱(优先使用文件名,失败才调用 API)
        print(f"  [2/4] 获取用户邮箱...")
        email = None
        
        # 如果允许,先尝试从文件名提取
        if use_filename_email:
            email = extract_email_from_filename(filename)
            if email:
                print(f"  ✓ 从文件名提取邮箱: {email}")
        
        # 如果文件名没有邮箱,调用 API
        if not email:
            try:
                email = await get_user_email(access_token)
                print(f"  ✓ 从 API 获取邮箱: {email}")
            except ConversionError as e:
                # API 调用失败,尝试再次刷新 token 后重试
                print(f"  ⚠ 首次获取邮箱失败,刷新 token 后重试...")
                try:
                    access_token, expiry, expires_in = await refresh_token_and_get_info(
                        client_id, client_secret, refresh_token
                    )
                    email = await get_user_email(access_token)
                    print(f"  ✓ 从 API 获取邮箱: {email}")
                except Exception as retry_e:
                    print(f"  [错误] 无法获取邮箱: {retry_e}")
                    return
        
        if not email:
            print(f"  [错误] 无法确定邮箱地址")
            return
 
        # 3. 获取或验证 project_id
        print(f"  [3/4] 处理 project_id...")
        project_id = data.get("project_id")
        
        if not project_id or project_id in ["unknown_project", "null", ""]:
            print(f"  ⚠ project_id 无效,尝试从 API 获取...")
            fetched_project_id = await fetch_project_id(access_token)
            if fetched_project_id:
                project_id = fetched_project_id
                print(f"  ✓ 从 API 获取到 project_id: {project_id}")
            else:
                # 使用邮箱的用户名部分作为 fallback
                project_id = f"unknown-{email.split('@')[0]}"
                print(f"  ⚠ 无法从 API 获取,使用 fallback: {project_id}")
        else:
            print(f"  ✓ project_id: {project_id}")
 
        # 4. 构建输出数据
        print(f"  [4/4] 生成输出文件...")
        
        # 获取 scopes
        scopes = data.get("scopes", [])
        if not scopes:
            # 默认 scopes
            scopes = [
                "openid",
                "https://www.googleapis.com/auth/userinfo.email",
                "https://www.googleapis.com/auth/cloud-platform",
                "https://www.googleapis.com/auth/generative-language.retriever"
            ]
        
        new_data = {
            "token": {
                "access_token": access_token,
                "client_id": client_id,
                "client_secret": client_secret,
                "expires_in": expires_in,
                "expiry": expiry,
                "refresh_token": refresh_token,
                "scopes": scopes,
                "token_type": "Bearer",
                "token_uri": data.get("token_uri", "https://oauth2.googleapis.com/token"),
                "universe_domain": "googleapis.com",
            },
            "project_id": project_id,
            "email": email,
            "auto": False,
            "checked": True,
            "type": "gemini",
        }
 
        # 保存文件
        output_filename = f"{email}-{project_id}.json"
        # 清理文件名中的非法字符
        output_filename = re.sub(r'[<>:"/\\|?*]', '_', output_filename)
        output_path = os.path.join(output_dir, output_filename)
        
        os.makedirs(output_dir, exist_ok=True)
        with open(output_path, "w", encoding="utf-8") as f:
            json.dump(new_data, f, separators=(",", ":"), ensure_ascii=False)
        
        print(f"  ✓ 转换成功: {output_filename}")
 
    except ConversionError as e:
        print(f"  [错误] {e}")
    except Exception as e:
        print(f"  [错误] 转换失败: {e}")
        import traceback
        traceback.print_exc()
 
 
async def process_path_async(input_path: str, output_dir: str, use_filename_email: bool = True, max_concurrent: int = 10):
    """异步处理文件或目录"""
    if os.path.isfile(input_path):
        await convert_file_async(input_path, output_dir, use_filename_email)
    elif os.path.isdir(input_path):
        print(f"处理目录: {input_path}")
        json_files = []
        for root, _, files in os.walk(input_path):
            for file in files:
                if file.lower().endswith(".json"):
                    json_files.append(os.path.join(root, file))
        
        if not json_files:
            print(f"目录中没有找到 JSON 文件")
            return
        
        print(f"找到 {len(json_files)} 个 JSON 文件")
        print(f"并发数: {max_concurrent}")
        
        # 限制并发数
        semaphore = asyncio.Semaphore(max_concurrent)
        
        async def process_with_semaphore(file_path):
            async with semaphore:
                await convert_file_async(file_path, output_dir, use_filename_email)
        
        tasks = [process_with_semaphore(f) for f in json_files]
        await asyncio.gather(*tasks, return_exceptions=True)
    else:
        print(f"错误: 输入路径不存在: {input_path}")
 
 
def main():
    parser = argparse.ArgumentParser(
        description="将 gcli2api 的凭证文件转换为 cliproxyapi 格式"
    )
    parser.add_argument("input_path", help="输入文件或目录路径")
    parser.add_argument(
        "-o", "--output_dir", 
        help="输出目录路径", 
        default="converted_output"
    )
    parser.add_argument(
        "--no-filename-email",
        action="store_true",
        help="禁用从文件名提取邮箱(总是调用 API)"
    )
    parser.add_argument(
        "-c", "--concurrent",
        type=int,
        default=10,
        help="最大并发数(默认 10)"
    )
 
    args = parser.parse_args()
 
    print("=" * 60)
    print("gcli2api → cliproxyapi 凭证转换工具")
    print("=" * 60)
    
    asyncio.run(
        process_path_async(
            args.input_path, 
            args.output_dir, 
            use_filename_email=not args.no_filename_email,
            max_concurrent=args.concurrent
        )
    )
    
    print("\n" + "=" * 60)
    print("处理完成")
    print("=" * 60)
 
 
if __name__ == "__main__":
    main()

使用示例:

# 默认模式(从文件名提取邮箱,并发 10)
python convert.py ./credentials -o ./output
 
# 强制调用 API 获取邮箱
python convert.py ./credentials -o ./output --no-filename-email
 
# 调整并发数
python convert.py ./credentials -o ./output -c 20

📌 转载信息
原作者:
DSLZL
转载时间:
2026/1/20 17:39:59

标签: python, Gemini CLI, CLI Proxy API, Google OAuth, API Development

添加新评论