提供一个 AI 写的测活脚本–OpenAI+Gemini
下载 key 到同目录下的 keys.txt
#!/usr/bin/env python3
import argparse
import json
import os
import queue
import random
import ssl
import threading
import time
import urllib.error
import urllib.request
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Check OpenAI API keys and list available models per key."
)
parser.add_argument(
"--input",
default="keys.txt",
help="Input txt file, one key per line. Default: keys.txt.",
)
parser.add_argument(
"--output",
default="valid_models.txt",
help="Output txt file for valid keys and their models.",
)
parser.add_argument(
"--bad-output",
default="",
help="Optional txt file for invalid keys and errors.",
)
parser.add_argument("--concurrency", type=int, default=64)
parser.add_argument("--queue-size", type=int, default=4096)
parser.add_argument("--timeout", type=float, default=12.0)
parser.add_argument("--max-retries", type=int, default=3)
parser.add_argument("--backoff-base", type=float, default=0.5)
parser.add_argument("--backoff-max", type=float, default=8.0)
parser.add_argument("--log-every", type=int, default=5000)
parser.add_argument("--flush-every", type=int, default=1000)
parser.add_argument(
"--base-url",
default=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com"),
)
parser.add_argument(
"--organization",
default=os.environ.get("OPENAI_ORG") or os.environ.get("OPENAI_ORGANIZATION"),
)
parser.add_argument("--project", default=os.environ.get("OPENAI_PROJECT"))
parser.add_argument(
"--extra-header",
action="append",
default=[],
help='Extra header, format "Name: Value". Can be repeated.',
)
return parser.parse_args()
def normalize_key(line: str) -> str | None:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
return None
return stripped.split()[0]
def parse_extra_headers(items: list[str]) -> dict[str, str]:
headers: dict[str, str] = {}
for item in items:
if ":" not in item:
raise ValueError(f"Invalid --extra-header value: {item}")
name, value = item.split(":", 1)
name = name.strip()
value = value.strip()
if not name:
raise ValueError(f"Invalid --extra-header name: {item}")
headers[name] = value
return headers
def compute_sleep(attempt: int, backoff_base: float, backoff_max: float, retry_after: str | None) -> float:
if retry_after:
try:
return min(backoff_max, float(retry_after))
except ValueError:
pass
jitter = random.random() * 0.2
return min(backoff_max, backoff_base * (2 ** attempt)) + jitter
def fetch_models_for_key(
key: str,
base_url: str,
base_headers: dict[str, str],
timeout: float,
max_retries: int,
backoff_base: float,
backoff_max: float,
ssl_context: ssl.SSLContext,
) -> tuple[list[str] | None, str, str]:
headers = dict(base_headers)
headers["Authorization"] = f"Bearer {key}"
url = f"{base_url}/v1/models"
for attempt in range(max_retries + 1):
try:
req = urllib.request.Request(url, headers=headers, method="GET")
with urllib.request.urlopen(req, timeout=timeout, context=ssl_context) as resp:
status = resp.getcode()
body = resp.read()
if status == 200:
try:
payload = json.loads(body)
except json.JSONDecodeError:
return None, "invalid_json", ""
data = payload.get("data")
if not isinstance(data, list):
return None, "invalid_response", ""
models = sorted(
{item.get("id") for item in data if isinstance(item, dict) and item.get("id")}
)
return models, "ok", ""
if status in (401, 403):
return None, "unauthorized", f"http_{status}"
if status == 429 or status >= 500:
if attempt < max_retries:
time.sleep(compute_sleep(attempt, backoff_base, backoff_max, None))
continue
return None, "retry_exhausted", f"http_{status}"
return None, "http_error", f"http_{status}"
except urllib.error.HTTPError as exc:
status = exc.code
retry_after = exc.headers.get("Retry-After") if exc.headers else None
if status in (401, 403):
return None, "unauthorized", f"http_{status}"
if status == 429 or status >= 500:
if attempt < max_retries:
time.sleep(compute_sleep(attempt, backoff_base, backoff_max, retry_after))
continue
return None, "retry_exhausted", f"http_{status}"
return None, "http_error", f"http_{status}"
except urllib.error.URLError as exc:
if attempt < max_retries:
time.sleep(compute_sleep(attempt, backoff_base, backoff_max, None))
continue
return None, "network_error", str(exc)
except Exception as exc: # noqa: BLE001
return None, "exception", str(exc)
return None, "retry_exhausted", "unknown"
class Stats:
def __init__(self, log_every: int) -> None:
self.log_every = log_every
self.lock = threading.Lock()
self.processed = 0
self.ok = 0
self.bad = 0
self.error = 0
self.total = 0
self.start = time.time()
def set_total(self, total: int) -> None:
with self.lock:
self.total = total
def update(self, status: str) -> None:
with self.lock:
self.processed += 1
if status == "ok":
self.ok += 1
elif status == "unauthorized":
self.bad += 1
else:
self.error += 1
if self.log_every > 0 and self.processed % self.log_every == 0:
elapsed = max(time.time() - self.start, 0.001)
rate = self.processed / elapsed
if self.total > 0:
remaining = max(self.total - self.processed, 0)
eta = remaining / rate if rate > 0 else 0.0
pct = (self.processed / self.total) * 100
print(
f"processed={self.processed}/{self.total} "
f"({pct:.1f}%) ok={self.ok} bad={self.bad} "
f"error={self.error} rate={rate:.1f}/s eta={eta:.0f}s",
flush=True,
)
else:
print(
f"processed={self.processed} ok={self.ok} "
f"bad={self.bad} error={self.error} rate={rate:.1f}/s",
flush=True,
)
def worker(
key_queue: queue.Queue,
result_queue: queue.Queue,
stats: Stats,
base_url: str,
base_headers: dict[str, str],
timeout: float,
max_retries: int,
backoff_base: float,
backoff_max: float,
ssl_context: ssl.SSLContext,
) -> None:
while True:
key = key_queue.get()
try:
if key is None:
return
models, status, detail = fetch_models_for_key(
key=key,
base_url=base_url,
base_headers=base_headers,
timeout=timeout,
max_retries=max_retries,
backoff_base=backoff_base,
backoff_max=backoff_max,
ssl_context=ssl_context,
)
result_queue.put((key, models, status, detail))
stats.update(status)
finally:
key_queue.task_done()
def writer(
result_queue: queue.Queue,
output_path: str,
bad_output_path: str | None,
flush_every: int,
) -> None:
out_fp = open(output_path, "w", encoding="utf-8")
bad_fp = open(bad_output_path, "w", encoding="utf-8") if bad_output_path else None
written = 0
bad_written = 0
try:
while True:
item = result_queue.get()
try:
if item is None:
return
key, models, status, detail = item
if status == "ok" and models is not None:
out_fp.write(f"{key}\t{','.join(models)}\n")
written += 1
if flush_every > 0 and written % flush_every == 0:
out_fp.flush()
elif bad_fp is not None:
bad_fp.write(f"{key}\t{status}\t{detail}\n")
bad_written += 1
if flush_every > 0 and bad_written % flush_every == 0:
bad_fp.flush()
finally:
result_queue.task_done()
finally:
out_fp.flush()
out_fp.close()
if bad_fp is not None:
bad_fp.flush()
bad_fp.close()
def main() -> int:
args = parse_args()
if args.concurrency <= 0:
raise SystemExit("--concurrency must be > 0")
if args.queue_size <= 0:
raise SystemExit("--queue-size must be > 0")
base_url = args.base_url.rstrip("/")
base_headers = {"Accept": "application/json"}
if args.organization:
base_headers["OpenAI-Organization"] = args.organization
if args.project:
base_headers["OpenAI-Project"] = args.project
if args.extra_header:
base_headers.update(parse_extra_headers(args.extra_header))
key_queue: queue.Queue = queue.Queue(maxsize=args.queue_size)
result_queue: queue.Queue = queue.Queue(maxsize=args.queue_size)
stats = Stats(log_every=args.log_every)
ssl_context = ssl.create_default_context()
bad_output = args.bad_output.strip() or None
writer_thread = threading.Thread(
target=writer,
args=(result_queue, args.output, bad_output, args.flush_every),
daemon=True,
)
writer_thread.start()
print(
"start "
f"input={args.input} output={args.output} bad_output={bad_output or '-'} "
f"concurrency={args.concurrency} queue_size={args.queue_size} "
f"timeout={args.timeout} max_retries={args.max_retries}",
flush=True,
)
workers: list[threading.Thread] = []
for _ in range(args.concurrency):
t = threading.Thread(
target=worker,
args=(
key_queue,
result_queue,
stats,
base_url,
base_headers,
args.timeout,
args.max_retries,
args.backoff_base,
args.backoff_max,
ssl_context,
),
daemon=True,
)
t.start()
workers.append(t)
queued = 0
with open(args.input, "r", encoding="utf-8", errors="ignore") as handle:
for line in handle:
key = normalize_key(line)
if not key:
continue
key_queue.put(key)
queued += 1
if args.log_every > 0 and queued % args.log_every == 0:
print(f"queued={queued}", flush=True)
stats.set_total(queued)
print(f"queued_total={queued}", flush=True)
for _ in range(args.concurrency):
key_queue.put(None)
key_queue.join()
for t in workers:
t.join()
result_queue.join()
result_queue.put(None)
writer_thread.join()
elapsed = max(time.time() - stats.start, 0.001)
print(
f"done processed={stats.processed} ok={stats.ok} bad={stats.bad} "
f"error={stats.error} seconds={elapsed:.1f}",
flush=True,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())
关联 https://linux.do/t/topic/1496409/26