GCLI2API Gemini CLI 转 CLI Proxy API Gemini CLI
闲着也是闲着,让 Gemini 和 Claude 糊的
代码:
import json
import sys
import re
import os
import argparse
import asyncio
import httpx
from datetime import datetime, timezone, timedelta
from typing import Optional, Dict, Any, Tuple
class ConversionError(Exception):
"""转换错误"""
pass
def extract_email_from_filename(filename: str) -> Optional[str]:
"""从文件名提取邮箱(作为 fallback)"""
# 移除扩展名
name = filename[:-5] if filename.lower().endswith(".json") else filename
# 匹配邮箱
email_pattern = r"([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})"
match = re.search(email_pattern, name)
return match.group(1) if match else None
async def refresh_token_and_get_info(
client_id: str,
client_secret: str,
refresh_token: str
) -> Tuple[str, str, int]:
"""
刷新 token 并获取新的 access_token、expiry 和 expires_in
Returns:
(access_token, expiry_iso_string, expires_in_seconds)
"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
"https://oauth2.googleapis.com/token",
data={
"client_id": client_id,
"client_secret": client_secret,
"refresh_token": refresh_token,
"grant_type": "refresh_token",
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
resp.raise_for_status()
token_data = resp.json()
new_access_token = token_data["access_token"]
expires_in = int(token_data.get("expires_in", 3599))
# 计算 expiry(ISO 格式)
current_utc = datetime.now(timezone.utc)
expires_at = current_utc + timedelta(seconds=expires_in)
expiry = expires_at.isoformat()
return new_access_token, expiry, expires_in
except httpx.HTTPStatusError as e:
if e.response.status_code == 400:
raise ConversionError(f"刷新 token 失败(可能 refresh_token 已失效): {e.response.text}")
raise ConversionError(f"刷新 token 失败 (HTTP {e.response.status_code}): {e.response.text}")
except httpx.HTTPError as e:
raise ConversionError(f"刷新 token 网络错误: {e}")
def is_token_expired(expiry: Optional[str]) -> bool:
"""检查 token 是否过期"""
if not expiry:
return True
try:
# 解析 expiry 时间
expiry_str = expiry.replace("Z", "+00:00")
expiry_dt = datetime.fromisoformat(expiry_str)
# 如果没有时区信息,假设为 UTC
if expiry_dt.tzinfo is None:
expiry_dt = expiry_dt.replace(tzinfo=timezone.utc)
# 提前 60 秒判定过期(安全边界)
now = datetime.now(timezone.utc)
return expiry_dt <= (now + timedelta(seconds=60))
except Exception as e:
# 解析失败,认为已过期
return True
async def get_user_email(access_token: str) -> str:
"""调用 Google API 获取真实邮箱"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(
"https://www.googleapis.com/oauth2/v2/userinfo",
headers={"Authorization": f"Bearer {access_token}"}
)
resp.raise_for_status()
data = resp.json()
email = data.get("email")
if not email:
raise ConversionError("API 响应中没有 email 字段")
return email
except httpx.HTTPStatusError as e:
raise ConversionError(f"获取邮箱失败 (HTTP {e.response.status_code}): {e.response.text}")
except httpx.HTTPError as e:
raise ConversionError(f"获取邮箱网络错误: {e}")
async def fetch_project_id(access_token: str) -> Optional[str]:
"""通过 loadCodeAssist API 获取 project_id"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
"https://generativelanguage.googleapis.com/v1alpha/models/code-gecko:loadCodeAssist",
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
json={}
)
if resp.status_code == 200:
data = resp.json()
# 从响应中提取 project_id
# 格式通常是 "projects/PROJECT_ID" 或直接是字段
if "name" in data:
name = data["name"]
if "projects/" in name:
parts = name.split("projects/")
if len(parts) > 1:
project_id = parts[1].split("/")[0]
return project_id
# 尝试其他字段
for field in ["projectId", "project_id", "projectNumber"]:
if field in data:
return str(data[field])
return None
except Exception:
return None
async def convert_file_async(input_path: str, output_dir: str, use_filename_email: bool = True):
"""异步转换单个文件"""
filename = os.path.basename(input_path)
print(f"\n处理文件: {filename}")
try:
with open(input_path, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
print(f" [错误] 读取文件失败: {e}")
return
# 获取必要字段
access_token = data.get("access_token") or data.get("token")
client_id = data.get("client_id")
client_secret = data.get("client_secret")
refresh_token = data.get("refresh_token")
if not all([client_id, client_secret, refresh_token]):
print(f" [错误] 缺少必要的 OAuth 字段 (client_id, client_secret, refresh_token)")
return
try:
# 1. 先检查并刷新 token(如果需要)
print(f" [1/4] 检查 token 有效性...")
expiry = data.get("expiry")
expires_in = 3599 # 默认值
if is_token_expired(expiry):
print(f" ⚠ Token 已过期或无效,正在刷新...")
access_token, expiry, expires_in = await refresh_token_and_get_info(
client_id, client_secret, refresh_token
)
print(f" ✓ Token 已刷新,新过期时间: {expiry}")
else:
print(f" ✓ Token 有效,过期时间: {expiry}")
# 保留原 access_token 和 expiry
if not access_token:
# 如果没有 access_token,强制刷新
print(f" ⚠ 缺少 access_token,正在刷新...")
access_token, expiry, expires_in = await refresh_token_and_get_info(
client_id, client_secret, refresh_token
)
print(f" ✓ Token 已刷新")
# 2. 获取邮箱(优先使用文件名,失败才调用 API)
print(f" [2/4] 获取用户邮箱...")
email = None
# 如果允许,先尝试从文件名提取
if use_filename_email:
email = extract_email_from_filename(filename)
if email:
print(f" ✓ 从文件名提取邮箱: {email}")
# 如果文件名没有邮箱,调用 API
if not email:
try:
email = await get_user_email(access_token)
print(f" ✓ 从 API 获取邮箱: {email}")
except ConversionError as e:
# API 调用失败,尝试再次刷新 token 后重试
print(f" ⚠ 首次获取邮箱失败,刷新 token 后重试...")
try:
access_token, expiry, expires_in = await refresh_token_and_get_info(
client_id, client_secret, refresh_token
)
email = await get_user_email(access_token)
print(f" ✓ 从 API 获取邮箱: {email}")
except Exception as retry_e:
print(f" [错误] 无法获取邮箱: {retry_e}")
return
if not email:
print(f" [错误] 无法确定邮箱地址")
return
# 3. 获取或验证 project_id
print(f" [3/4] 处理 project_id...")
project_id = data.get("project_id")
if not project_id or project_id in ["unknown_project", "null", ""]:
print(f" ⚠ project_id 无效,尝试从 API 获取...")
fetched_project_id = await fetch_project_id(access_token)
if fetched_project_id:
project_id = fetched_project_id
print(f" ✓ 从 API 获取到 project_id: {project_id}")
else:
# 使用邮箱的用户名部分作为 fallback
project_id = f"unknown-{email.split('@')[0]}"
print(f" ⚠ 无法从 API 获取,使用 fallback: {project_id}")
else:
print(f" ✓ project_id: {project_id}")
# 4. 构建输出数据
print(f" [4/4] 生成输出文件...")
# 获取 scopes
scopes = data.get("scopes", [])
if not scopes:
# 默认 scopes
scopes = [
"openid",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/generative-language.retriever"
]
new_data = {
"token": {
"access_token": access_token,
"client_id": client_id,
"client_secret": client_secret,
"expires_in": expires_in,
"expiry": expiry,
"refresh_token": refresh_token,
"scopes": scopes,
"token_type": "Bearer",
"token_uri": data.get("token_uri", "https://oauth2.googleapis.com/token"),
"universe_domain": "googleapis.com",
},
"project_id": project_id,
"email": email,
"auto": False,
"checked": True,
"type": "gemini",
}
# 保存文件
output_filename = f"{email}-{project_id}.json"
# 清理文件名中的非法字符
output_filename = re.sub(r'[<>:"/\\|?*]', '_', output_filename)
output_path = os.path.join(output_dir, output_filename)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
json.dump(new_data, f, separators=(",", ":"), ensure_ascii=False)
print(f" ✓ 转换成功: {output_filename}")
except ConversionError as e:
print(f" [错误] {e}")
except Exception as e:
print(f" [错误] 转换失败: {e}")
import traceback
traceback.print_exc()
async def process_path_async(input_path: str, output_dir: str, use_filename_email: bool = True, max_concurrent: int = 10):
"""异步处理文件或目录"""
if os.path.isfile(input_path):
await convert_file_async(input_path, output_dir, use_filename_email)
elif os.path.isdir(input_path):
print(f"处理目录: {input_path}")
json_files = []
for root, _, files in os.walk(input_path):
for file in files:
if file.lower().endswith(".json"):
json_files.append(os.path.join(root, file))
if not json_files:
print(f"目录中没有找到 JSON 文件")
return
print(f"找到 {len(json_files)} 个 JSON 文件")
print(f"并发数: {max_concurrent}")
# 限制并发数
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(file_path):
async with semaphore:
await convert_file_async(file_path, output_dir, use_filename_email)
tasks = [process_with_semaphore(f) for f in json_files]
await asyncio.gather(*tasks, return_exceptions=True)
else:
print(f"错误: 输入路径不存在: {input_path}")
def main():
parser = argparse.ArgumentParser(
description="将 gcli2api 的凭证文件转换为 cliproxyapi 格式"
)
parser.add_argument("input_path", help="输入文件或目录路径")
parser.add_argument(
"-o", "--output_dir",
help="输出目录路径",
default="converted_output"
)
parser.add_argument(
"--no-filename-email",
action="store_true",
help="禁用从文件名提取邮箱(总是调用 API)"
)
parser.add_argument(
"-c", "--concurrent",
type=int,
default=10,
help="最大并发数(默认 10)"
)
args = parser.parse_args()
print("=" * 60)
print("gcli2api → cliproxyapi 凭证转换工具")
print("=" * 60)
asyncio.run(
process_path_async(
args.input_path,
args.output_dir,
use_filename_email=not args.no_filename_email,
max_concurrent=args.concurrent
)
)
print("\n" + "=" * 60)
print("处理完成")
print("=" * 60)
if __name__ == "__main__":
main()
使用示例:
# 默认模式(从文件名提取邮箱,并发 10)
python convert.py ./credentials -o ./output
# 强制调用 API 获取邮箱
python convert.py ./credentials -o ./output --no-filename-email
# 调整并发数
python convert.py ./credentials -o ./output -c 20
