LangGraph Server + AsyncPostgresSaver + unicorn 启动
使用 django 的 command 启动 langGraph Server, 但要求基于 AsyncPostgresSaver,
没有找到相关的可用的代码, 这里记录下 直接抛出代码"""
Run the LangGraph Agent Server
"""
import asyncio
import uvicorn
from django.core.management.base import BaseCommand
from django.conf import settings
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from langserve import add_routes
from langchain_core.globals import set_verbose
from psycopg_pool import AsyncConnectionPool
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from apps.ai.graph.app import AsyncGraphApp
class Command(BaseCommand):
"""
Run the LangGraph Agent Server
"""
help = "Starts the LangGraph Agent Server"
def add_arguments(self, parser):
parser.add_argument("--host", type=str, default="0.0.0.0")
parser.add_argument("--port", type=int, default=2028)
def handle(self, *args, **options):
asyncio.run(self.handle_async(*args, **options))
async def handle_async(self, *args, **options):
"""启动 Agent Server"""
host = options["host"]
port = options["port"]
self.stdout.write(f"Starting Agent Server at http://{host}:{port}...")
# Get the LangGraph application
checkpointer = await self.get_checkpointer()
graph_app = AsyncGraphApp().compile(checkpointer=checkpointer).app
print(f"graph_app----------------->: {graph_app}")
# Initialize FastAPI app
app = FastAPI(
title="Baby Consultant Agent",
version="1.0",
description="A LangGraph-based agent for baby consultation",
)
# Set CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
set_verbose(True)
# Add routes using LangServe
# This exposes the graph at /agent/invoke, /agent/stream, etc.
add_routes(
app,
graph_app,
path="/agent",
)
# Run with Uvicorn
config = uvicorn.Config(app, host=host, port=port)
# 基于当前的Async Running Loop 启动unicorn
server = uvicorn.Server(config)
await server.serve()
async def get_checkpointer(self):
"""获取 Checkpointer"""
# 1. 显式创建连接池 (让它在应用生命周期内一直存活)
connection_kwargs = {
"autocommit": True,
"prepare_threshold": 0,
}
# 使用同步的 ConnectionPool
pool = AsyncConnectionPool(
conninfo=settings.LANGGRAPH_POSTGRES_CONNECTION_STRING,
max_size=20,
kwargs=connection_kwargs,
)
# 2. 将连接池传入构造函数
checkpointer = AsyncPostgresSaver(pool)
# 3. 初始化数据库表
await checkpointer.setup()
return checkpointer