下载 key 到同目录下的 keys.txt

#!/usr/bin/env python3
import argparse
import json
import os
import queue
import random
import ssl
import threading
import time
import urllib.error
import urllib.request


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Check OpenAI API keys and list available models per key."
    )
    parser.add_argument(
        "--input",
        default="keys.txt",
        help="Input txt file, one key per line. Default: keys.txt.",
    )
    parser.add_argument(
        "--output",
        default="valid_models.txt",
        help="Output txt file for valid keys and their models.",
    )
    parser.add_argument(
        "--bad-output",
        default="",
        help="Optional txt file for invalid keys and errors.",
    )
    parser.add_argument("--concurrency", type=int, default=64)
    parser.add_argument("--queue-size", type=int, default=4096)
    parser.add_argument("--timeout", type=float, default=12.0)
    parser.add_argument("--max-retries", type=int, default=3)
    parser.add_argument("--backoff-base", type=float, default=0.5)
    parser.add_argument("--backoff-max", type=float, default=8.0)
    parser.add_argument("--log-every", type=int, default=5000)
    parser.add_argument("--flush-every", type=int, default=1000)
    parser.add_argument(
        "--base-url",
        default=os.environ.get("OPENAI_BASE_URL", "https://api.openai.com"),
    )
    parser.add_argument(
        "--organization",
        default=os.environ.get("OPENAI_ORG") or os.environ.get("OPENAI_ORGANIZATION"),
    )
    parser.add_argument("--project", default=os.environ.get("OPENAI_PROJECT"))
    parser.add_argument(
        "--extra-header",
        action="append",
        default=[],
        help='Extra header, format "Name: Value". Can be repeated.',
    )
    return parser.parse_args()


def normalize_key(line: str) -> str | None:
    stripped = line.strip()
    if not stripped or stripped.startswith("#"):
        return None
    return stripped.split()[0]


def parse_extra_headers(items: list[str]) -> dict[str, str]:
    headers: dict[str, str] = {}
    for item in items:
        if ":" not in item:
            raise ValueError(f"Invalid --extra-header value: {item}")
        name, value = item.split(":", 1)
        name = name.strip()
        value = value.strip()
        if not name:
            raise ValueError(f"Invalid --extra-header name: {item}")
        headers[name] = value
    return headers


def compute_sleep(attempt: int, backoff_base: float, backoff_max: float, retry_after: str | None) -> float:
    if retry_after:
        try:
            return min(backoff_max, float(retry_after))
        except ValueError:
            pass
    jitter = random.random() * 0.2
    return min(backoff_max, backoff_base * (2 ** attempt)) + jitter


def fetch_models_for_key(
        key: str,
        base_url: str,
        base_headers: dict[str, str],
        timeout: float,
        max_retries: int,
        backoff_base: float,
        backoff_max: float,
        ssl_context: ssl.SSLContext,
) -> tuple[list[str] | None, str, str]:
    headers = dict(base_headers)
    headers["Authorization"] = f"Bearer {key}"
    url = f"{base_url}/v1/models"

    for attempt in range(max_retries + 1):
        try:
            req = urllib.request.Request(url, headers=headers, method="GET")
            with urllib.request.urlopen(req, timeout=timeout, context=ssl_context) as resp:
                status = resp.getcode()
                body = resp.read()
            if status == 200:
                try:
                    payload = json.loads(body)
                except json.JSONDecodeError:
                    return None, "invalid_json", ""
                data = payload.get("data")
                if not isinstance(data, list):
                    return None, "invalid_response", ""
                models = sorted(
                    {item.get("id") for item in data if isinstance(item, dict) and item.get("id")}
                )
                return models, "ok", ""
            if status in (401, 403):
                return None, "unauthorized", f"http_{status}"
            if status == 429 or status >= 500:
                if attempt < max_retries:
                    time.sleep(compute_sleep(attempt, backoff_base, backoff_max, None))
                    continue
                return None, "retry_exhausted", f"http_{status}"
            return None, "http_error", f"http_{status}"
        except urllib.error.HTTPError as exc:
            status = exc.code
            retry_after = exc.headers.get("Retry-After") if exc.headers else None
            if status in (401, 403):
                return None, "unauthorized", f"http_{status}"
            if status == 429 or status >= 500:
                if attempt < max_retries:
                    time.sleep(compute_sleep(attempt, backoff_base, backoff_max, retry_after))
                    continue
                return None, "retry_exhausted", f"http_{status}"
            return None, "http_error", f"http_{status}"
        except urllib.error.URLError as exc:
            if attempt < max_retries:
                time.sleep(compute_sleep(attempt, backoff_base, backoff_max, None))
                continue
            return None, "network_error", str(exc)
        except Exception as exc:  # noqa: BLE001
            return None, "exception", str(exc)

    return None, "retry_exhausted", "unknown"


class Stats:
    def __init__(self, log_every: int) -> None:
        self.log_every = log_every
        self.lock = threading.Lock()
        self.processed = 0
        self.ok = 0
        self.bad = 0
        self.error = 0
        self.total = 0
        self.start = time.time()

    def set_total(self, total: int) -> None:
        with self.lock:
            self.total = total

    def update(self, status: str) -> None:
        with self.lock:
            self.processed += 1
            if status == "ok":
                self.ok += 1
            elif status == "unauthorized":
                self.bad += 1
            else:
                self.error += 1
            if self.log_every > 0 and self.processed % self.log_every == 0:
                elapsed = max(time.time() - self.start, 0.001)
                rate = self.processed / elapsed
                if self.total > 0:
                    remaining = max(self.total - self.processed, 0)
                    eta = remaining / rate if rate > 0 else 0.0
                    pct = (self.processed / self.total) * 100
                    print(
                        f"processed={self.processed}/{self.total} "
                        f"({pct:.1f}%) ok={self.ok} bad={self.bad} "
                        f"error={self.error} rate={rate:.1f}/s eta={eta:.0f}s",
                        flush=True,
                    )
                else:
                    print(
                        f"processed={self.processed} ok={self.ok} "
                        f"bad={self.bad} error={self.error} rate={rate:.1f}/s",
                        flush=True,
                    )


def worker(
        key_queue: queue.Queue,
        result_queue: queue.Queue,
        stats: Stats,
        base_url: str,
        base_headers: dict[str, str],
        timeout: float,
        max_retries: int,
        backoff_base: float,
        backoff_max: float,
        ssl_context: ssl.SSLContext,
) -> None:
    while True:
        key = key_queue.get()
        try:
            if key is None:
                return
            models, status, detail = fetch_models_for_key(
                key=key,
                base_url=base_url,
                base_headers=base_headers,
                timeout=timeout,
                max_retries=max_retries,
                backoff_base=backoff_base,
                backoff_max=backoff_max,
                ssl_context=ssl_context,
            )
            result_queue.put((key, models, status, detail))
            stats.update(status)
        finally:
            key_queue.task_done()


def writer(
        result_queue: queue.Queue,
        output_path: str,
        bad_output_path: str | None,
        flush_every: int,
) -> None:
    out_fp = open(output_path, "w", encoding="utf-8")
    bad_fp = open(bad_output_path, "w", encoding="utf-8") if bad_output_path else None
    written = 0
    bad_written = 0
    try:
        while True:
            item = result_queue.get()
            try:
                if item is None:
                    return
                key, models, status, detail = item
                if status == "ok" and models is not None:
                    out_fp.write(f"{key}\t{','.join(models)}\n")
                    written += 1
                    if flush_every > 0 and written % flush_every == 0:
                        out_fp.flush()
                elif bad_fp is not None:
                    bad_fp.write(f"{key}\t{status}\t{detail}\n")
                    bad_written += 1
                    if flush_every > 0 and bad_written % flush_every == 0:
                        bad_fp.flush()
            finally:
                result_queue.task_done()
    finally:
        out_fp.flush()
        out_fp.close()
        if bad_fp is not None:
            bad_fp.flush()
            bad_fp.close()


def main() -> int:
    args = parse_args()
    if args.concurrency <= 0:
        raise SystemExit("--concurrency must be > 0")
    if args.queue_size <= 0:
        raise SystemExit("--queue-size must be > 0")

    base_url = args.base_url.rstrip("/")
    base_headers = {"Accept": "application/json"}
    if args.organization:
        base_headers["OpenAI-Organization"] = args.organization
    if args.project:
        base_headers["OpenAI-Project"] = args.project
    if args.extra_header:
        base_headers.update(parse_extra_headers(args.extra_header))

    key_queue: queue.Queue = queue.Queue(maxsize=args.queue_size)
    result_queue: queue.Queue = queue.Queue(maxsize=args.queue_size)
    stats = Stats(log_every=args.log_every)
    ssl_context = ssl.create_default_context()

    bad_output = args.bad_output.strip() or None
    writer_thread = threading.Thread(
        target=writer,
        args=(result_queue, args.output, bad_output, args.flush_every),
        daemon=True,
    )
    writer_thread.start()

    print(
        "start "
        f"input={args.input} output={args.output} bad_output={bad_output or '-'} "
        f"concurrency={args.concurrency} queue_size={args.queue_size} "
        f"timeout={args.timeout} max_retries={args.max_retries}",
        flush=True,
    )

    workers: list[threading.Thread] = []
    for _ in range(args.concurrency):
        t = threading.Thread(
            target=worker,
            args=(
                key_queue,
                result_queue,
                stats,
                base_url,
                base_headers,
                args.timeout,
                args.max_retries,
                args.backoff_base,
                args.backoff_max,
                ssl_context,
            ),
            daemon=True,
        )
        t.start()
        workers.append(t)

    queued = 0
    with open(args.input, "r", encoding="utf-8", errors="ignore") as handle:
        for line in handle:
            key = normalize_key(line)
            if not key:
                continue
            key_queue.put(key)
            queued += 1
            if args.log_every > 0 and queued % args.log_every == 0:
                print(f"queued={queued}", flush=True)

    stats.set_total(queued)
    print(f"queued_total={queued}", flush=True)

    for _ in range(args.concurrency):
        key_queue.put(None)

    key_queue.join()
    for t in workers:
        t.join()

    result_queue.join()
    result_queue.put(None)
    writer_thread.join()

    elapsed = max(time.time() - stats.start, 0.001)
    print(
        f"done processed={stats.processed} ok={stats.ok} bad={stats.bad} "
        f"error={stats.error} seconds={elapsed:.1f}",
        flush=True,
    )
    return 0


if __name__ == "__main__":
    raise SystemExit(main())

关联 https://linux.do/t/topic/1496409/26


📌 转载信息
原作者:
gggwl2021
转载时间:
2026/1/22 13:16:04

标签: openai api, Python Script, Gemini API, API Key Validation, Concurrent Programming

添加新评论