2026年的 ReAct Agent架构解析:原生 Tool Calling 与 LangGraph 状态机
ReAct(Reason + Act)架构要解决的问题是开放式研究里最经典的问题。本文要做的是一个 Research Brief Agent:会上网搜索、抓取真实 URL、压缩证据,最终产出一份带真实引用的结构化简报。重点不在于功能,而在于 正确写法——不再依赖那种脆弱的 "Thought: / Action:" 字符串解析。 ReAct 论文最初证明了,让 LLM 在动手之前先把推理写出来,效果会明显更好。 那时候的实现可以说就是 prompt hack。给模型一段这样的提示: 模型吐回一段字符串,Python 用正则去抠工具名和参数,工具运行的结果再以 的形式拼回 prompt 里。 demo 阶段勉强能跑到了生产环境就问题成堆。模型对格式的幻觉源源不断:一会儿漏掉 前缀,一会儿调用一个根本不存在的工具,正则当场就崩。 这套写法早就被淘汰了,但Reason、Act、Observe 这三段核心节奏依然成立,只是执行模型完全换了一种思路。 现在的工具使用系统不再做字符串解析,而是原生的、结构化的 API tool calling。schema 校验由 LLM 提供方负责——OpenAI、Anthropic、Google 都是如此——严格性放在他们那一侧。 新的 ReAct 循环大致是这样: 循环一直跑到 LLM 觉得证据足够,然后输出一段普通的文本回复,而不是再发一个 tool call。 动手开始写。第一件事是定义 state schema。 确定性 workflow 里的 state 通常是一组离散字段—— 、 之类。但开放式 ReAct 循环里,state 主要表现为一份 append-only 的消息账本。 只有消息还不够。引用也要追踪:不光要让 LLM 写出一段总结,还得拿到一份能在 UI 里渲染的具体引用列表。 注意 reducer 的作用, 让新消息追加而不是覆盖, 给 citations 和 URL 列表做的是同样的事。在循环里维护历史,靠的就是这两件小工具。 把图连起来之前得先有工具,一个常见错误是给 agent 一个返回完整原始 HTML 的工具——第一轮循环还没结束,上下文窗口就已经被冲爆。 下面两个普通的 HTTP 工具就够了: 找候选链接, 拉真正的正文。 错误处理的写法值得多看一眼: 抛超时,捕获之后把异常转成 JSON 字符串返回。一个失效链接不应当能整死一整张图的执行。错误以字符串形式回流,LLM 把它读作一次 observation,自然会去尝试别的链接。 ReAct 图的结构出乎意料地简单——本质上只要两个主节点:一个让 LLM 推理,一个执行工具。 节点和路由是这样写的: 是认知工作真正发生的地方,整段消息历史会一次性喂给模型;信息不够时,模型会回一条带 的 。 路由函数 检查这条消息:包含 tool calls 就把图切到 节点。LangGraph 提供了一个预置的 ,它会自动解包参数、执行 Python 函数,并产出 形式的 observation。 如果让 把结果直接倒回 state 就此打住,等于错失了一个机会。引用应该在过程中就抽出来,不必等到写最终简报时再让 LLM 凭记忆复盘每一个 URL。 工具节点之后再加一个 节点。 这个节点拦下原始的工具输出做一次解析,更新结构化的 账本,顺手也维护 。Agent 连续三轮没找到任何新 URL,那就是卡住了。 一个开放式 agent 的最大一个问题是无法收敛。你问的是 2026 年的实践,它一头扎进 2018 年的 API 文档——这种情况发生时,你需要清楚地知道是哪些搜索结果把它带歪的。 State 不过是一份消息列表,不持久化的话,脚本一退出就什么都没了。前面几篇文章里造好的 Postgres checkpointer 现在派上用场。 把 topic 哈希成 ,相同主题二次运行时就会从中断点续上,而不是从零开跑。还有一个更现实的好处:整个 ReAct 循环会在数据库里留下完整可查询的记录。 各部分拼起来之后,整个 workflow 看起来很顺:agent 推理,工具执行,state 被清洗,循环再来一遍。条件触发后,落到 finalization 阶段。 这段代码现在跑起来,多数时候表现得很漂亮:搜索、抓几个页面、写出一份扎实的简报。 但是有时候它就根本停不下来。 LLM 会判定手头的信息还差点意思,于是换个略不一样的关键词再搜一次,再抓一个页面,再搜一次。每次新增的 都在拉长上下文窗口,agent 渐渐失焦被自己堆出来的海量文本搞混了。 循环不加约束的后果非常具体——一个 agent 可以心安理得地连发 100 次 tool call,把 API 预算烧光,最后在超过模型最大 token 限制时直接挂掉。 路由里那个 检查只是基础款; 用来抓住卡住的瞬间。生产环境要的远不止这些:实时监控 token 用量、为特定工具加熔断器(circuit breaker)、给 agent 留一个 "逃生舱" 在彻底找不到答案时去找人介入——一整套约束都得跟上。 下面是 Research Brief Agent 的完整脚本。保存好,配置 ,本地起一个 Postgres 实例,跑起来看效果。 到这一步,一个能跑的 ReAct 循环已经具备:原生 tool calling,state 持久化,脆弱的字符串解析换成了结构化的证据收集。 但成本失控的问题还存在。后续我们会把这个循环锁紧——硬性约束、token 预算、循环检测算法一起上,把这份原型变成可以放到生产环境长期运行的模式。 https://avoid.overfit.cn/post/1f0a889672024b59bb8791df1fc2b5e5 by Anubhav
早期 ReAct 留下的问题
You have access to tools. You must use this format: Thought: [your thought], Action: [tool_name], Action Input: [tool_input].Observation: [result]Action Input:
2026 年的 ReAct:原生工具调用
{"name": "search_web", "arguments": {"query": "react agent failures"}}。ToolMessage 追加回 state。
Research Brief Agent:State 与 Schema
raw_diffhas_critical_findings from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
import operator
class ResearchState(TypedDict):
topic: str
# 会话的核心账本
messages: Annotated[list[BaseMessage], add_messages]
# 在循环过程中累积起来的证据库
citations: Annotated[list[dict[str, str]], operator.add]
seen_urls: Annotated[list[str], operator.add]
# 防止无限循环的控制变量
step_count: int
max_steps: int
stagnant_turns: int
final_brief: stradd_messagesoperator.add
Search 与 Fetch
search_webfetch_url import json
import urllib.request
import urllib.parse
from langchain_core.tools import tool
import re
import html
def _http_get(url: str, timeout: int = 12) -> str:
with httpx.Client(timeout=timeout, headers={"User-Agent": "Mozilla/5.0"}) as client:
response = client.get(url)
response.raise_for_status()
return response.text
@tool
def search_web(query: str, max_results: int = 5) -> str:
"""Search the web via DuckDuckGo Instant Answer API. Returns JSON list."""
try:
params = urllib.parse.urlencode({"q": query, "format": "json"})
payload = _http_get(f"https://api.duckduckgo.com/?{params}")
data = json.loads(payload)
# ...(提取 URL 与 snippet 的解析逻辑)...
# 简洁起见,假设这里返回的 JSON 字符串结构为:
# [{"url": "...", "title": "...", "snippet": "..."}]
return json.dumps(results[:max_results])
except Exception as exc:
return json.dumps([{"url": "", "title": "error", "snippet": str(exc)}])
@tool
def fetch_url(url: str) -> str:
"""Fetch and compress a URL into JSON: {url,title,snippet}."""
try:
raw_html = _http_get(url)
# 剥掉 script、style 与 HTML 标签,留下纯文本
no_script = re.sub(r"<script[^>]*>.*?</script>", " ", raw_html, flags=re.IGNORECASE | re.DOTALL)
no_style = re.sub(r"<style[^>]*>.*?</style>", " ", no_script, flags=re.IGNORECASE | re.DOTALL)
text = re.sub(r"<[^>]+>", " ", no_style)
clean_text = html.unescape(re.sub(r"\s+", " ", text)).strip()
# 只取前 2000 个字符,省一点上下文
return json.dumps({"url": url, "title": "Fetched Page", "snippet": clean_text[:2000]})
except Exception as exc:
return json.dumps({"url": url, "title": "error", "snippet": str(exc)})httpx
图的形状:比你想的小
from langgraph.graph import StateGraph, START, END
from langgraph.prebuilt import ToolNode
from langchain_core.messages import SystemMessage, AIMessage
async def reason_node(state: ResearchState, llm_with_tools) -> dict:
system = SystemMessage(
content=(
"You are a research brief agent. Use tools to gather evidence and cite URLs. "
"Prefer search first, then fetch_url for top links. "
"When you have enough evidence, return a concise final brief. "
"Never fabricate citations."
)
)
# LLM 看一遍完整消息历史,再决定下一步动作
response = await llm_with_tools.ainvoke([system] + state["messages"])
return {
"messages": [response],
"step_count": state["step_count"] + 1,
}
def route_after_reason(state: ResearchState) -> str:
# 硬停止条件
if state["step_count"] >= state["max_steps"]:
return "finalize"
last_message = state["messages"][-1]
# LLM 决定调用工具,则导向工具执行节点
if isinstance(last_message, AIMessage) and last_message.tool_calls:
return "tools"
# 没有 tool call,说明 LLM 认为研究阶段结束
return "finalize"reason_nodetool_callsAIMessageroute_after_reasontoolsToolNodeToolMessage拦截器:清洗 Observation
ToolNodesanitize import json
def sanitize_observation_node(state: ResearchState) -> dict:
# 取最近的 tool messages
recent_messages = state["messages"][-3:]
new_citations = []
new_urls = []
for msg in recent_messages:
if getattr(msg, "type", "") != "tool":
continue
try:
# 解析工具返回的 JSON 字符串
data = json.loads(str(msg.content))
items = data if isinstance(data, list) else [data]
for item in items:
url = item.get("url", "").strip()
if url and url not in state["seen_urls"]:
new_urls.append(url)
new_citations.append({
"url": url,
/* Lines 230-231 omitted */
"snippet": item.get("snippet", "")
})
except Exception:
pass
# 跟踪 agent 是否陷入找不到任何新内容的状态
stagnant_turns = state["stagnant_turns"] + 1 if not new_urls else 0
return {
"citations": new_citations,
"seen_urls": new_urls,
"stagnant_turns": stagnant_turns,
}citationsstagnant_turns持久化与可重放
import hashlib
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from psycopg_pool import AsyncConnectionPool
def topic_to_thread_id(topic: str) -> str:
digest = hashlib.sha1(topic.encode("utf-8")).hexdigest()[:10]
return f"research-{digest}"
# 在 run 函数内部:
db_uri = "postgresql://postgres:postgres@localhost:5432/postgres"
async with AsyncConnectionPool(conninfo=db_uri, max_size=10) as pool:
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
thread_id = topic_to_thread_id(topic)
config = {"configurable": {"thread_id": thread_id}}
# 运行图……thread_id把图拼起来
async def finalize_node(state: ResearchState, llm) -> dict:
# 从清洗过的账本里组一段干净的证据块
citation_block = "\n".join(
f"- {c['title']} | {c['url']} | {c['snippet'][:180]}"
for c in state["citations"][:8]
)
prompt = (
f"Topic: {state['topic']}\n\n"
"Write a concise research brief in 8-12 bullets.\n"
"Each bullet should be evidence-backed where possible.\n"
"Then add a short 'Citations' section with URL list only.\n\n"
f"Evidence:\n{citation_block if citation_block else '- No citations collected.'}"
)
response = await llm.ainvoke([HumanMessage(content=prompt)])
return {
"final_brief": str(response.content),
}
# 构建执行图
def build_graph(tools, llm):
llm_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools)
builder = StateGraph(ResearchState)
builder.add_node("reason", lambda state: reason_node(state, llm_with_tools))
builder.add_node("tools", tool_node)
builder.add_node("sanitize", sanitize_observation_node)
builder.add_node("finalize", lambda state: finalize_node(state, llm))
builder.add_edge(START, "reason")
builder.add_conditional_edges(
"reason",
route_after_reason,
{"tools": "tools", "finalize": "finalize"}
)
builder.add_edge("tools", "sanitize")
# 用于检查是否停滞的另一条条件边
def route_after_sanitize(state):
if state["step_count"] >= state["max_steps"] or state["stagnant_turns"] >= 3:
return "finalize"
return "reason"
builder.add_conditional_edges(
"sanitize",
route_after_sanitize,
{"reason": "reason", "finalize": "finalize"}
)
builder.add_edge("finalize", END)
return builder第 100 次工具调用问题
ToolMessagemax_stepsstagnant_turns
完整可运行代码
GEMINI_API_KEY import argparse
import asyncio
import hashlib
import html
import json
import operator
import os
import re
from urllib.parse import urlencode
from typing import Annotated, Any, TypedDict
from dotenv import load_dotenv
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from psycopg_pool import AsyncConnectionPool
import httpx
load_dotenv()
def _http_get(url: str, timeout: int = 12) -> str:
with httpx.Client(timeout=timeout, headers={"User-Agent": "Mozilla/5.0"}) as client:
response = client.get(url)
response.raise_for_status()
return response.text
def _clean_text_from_html(raw_html: str) -> tuple[str, str]:
title_match = re.search(r"<title[^>]*>(.*?)</title>", raw_html, flags=re.IGNORECASE | re.DOTALL)
title = html.unescape(title_match.group(1).strip()) if title_match else "Untitled"
no_script = re.sub(r"<script[^>]*>.*?</script>", " ", raw_html, flags=re.IGNORECASE | re.DOTALL)
no_style = re.sub(r"<style[^>]*>.*?</style>", " ", no_script, flags=re.IGNORECASE | re.DOTALL)
text = re.sub(r"<[^>]+>", " ", no_style)
text = html.unescape(re.sub(r"\s+", " ", text)).strip()
return title, text
@tool
def search_web(query: str, max_results: int = 5) -> str:
"""Search the web via DuckDuckGo Instant Answer API. Returns JSON list."""
try:
params = urlencode(
{
"q": query,
"format": "json",
"no_html": "1",
"no_redirect": "1",
"skip_disambig": "1",
}
)
payload = _http_get(f"https://api.duckduckgo.com/?{params}")
data = json.loads(payload)
results: list[dict[str, str]] = []
abstract_url = (data.get("AbstractURL") or "").strip()
if abstract_url:
results.append(
{
"url": abstract_url,
"title": (data.get("Heading") or "DuckDuckGo Abstract").strip(),
"snippet": (data.get("AbstractText") or "").strip(),
}
)
def _collect_topics(items: list[dict[str, Any]]) -> None:
for item in items:
if "FirstURL" in item:
results.append(
{/* Lines 437-440 omitted */}
)
elif "Topics" in item and isinstance(item["Topics"], list):
_collect_topics(item["Topics"])
related = data.get("RelatedTopics")
if isinstance(related, list):
_collect_topics(related)
deduped: dict[str, dict[str, str]] = {}
for item in results:
if item["url"]:
deduped[item["url"]] = item
trimmed = list(deduped.values())[: max(1, max_results)]
return json.dumps(trimmed)
except Exception as exc:
return json.dumps([{"url": "", "title": "search_error", "snippet": str(exc)}])
@tool
def fetch_url(url: str) -> str:
"""Fetch and compress a URL into JSON: {url,title,snippet}."""
try:
raw_html = _http_get(url)
title, text = _clean_text_from_html(raw_html)
return json.dumps({"url": url, "title": title, "snippet": text[:900]})
except Exception as exc:
return json.dumps({"url": url, "title": "fetch_error", "snippet": str(exc)})
class ResearchState(TypedDict):
topic: str
messages: Annotated[list[BaseMessage], add_messages]
citations: Annotated[list[dict[str, str]], operator.add]
seen_urls: Annotated[list[str], operator.add]
step_count: int
max_steps: int
stagnant_turns: int
final_brief: str
def parse_citations_from_tool_messages(messages: list[BaseMessage]) -> tuple[list[dict[str, str]], list[str]]:
citations: list[dict[str, str]] = []
seen_urls: list[str] = []
for message in messages:
if getattr(message, "type", "") != "tool":
continue
content = getattr(message, "content", "")
if not isinstance(content, str):
continue
try:
parsed = json.loads(content)
except Exception:
continue
candidates = parsed if isinstance(parsed, list) else [parsed]
for item in candidates:
if not isinstance(item, dict):
continue
url = str(item.get("url", "")).strip()
if not url:
continue
seen_urls.append(url)
citations.append(
{
"url": url,
/* Lines 511-512 omitted */
"snippet": str(item.get("snippet", "")).strip(),
}
)
return citations, seen_urls
def compact_unique_citations(citations: list[dict[str, str]]) -> list[dict[str, str]]:
deduped: dict[str, dict[str, str]] = {}
for c in citations:
url = c.get("url", "")
if not url:
continue
if url not in deduped:
deduped[url] = c
return list(deduped.values())
async def reason_node(state: ResearchState, llm_with_tools: Any) -> dict[str, Any]:
system = SystemMessage(
content=(
"You are a research brief agent. Use tools to gather evidence and cite URLs. "
"Prefer search first, then fetch_url for top links. "
"When you have enough evidence, return a concise final brief. "
"Never fabricate citations."
)
)
response = await llm_with_tools.ainvoke([system] + state["messages"])
return {
"messages": [response],
"step_count": state["step_count"] + 1,
}
def route_after_reason(state: ResearchState) -> str:
if state["step_count"] >= state["max_steps"]:
return "finalize"
last = state["messages"][-1]
if isinstance(last, AIMessage) and last.tool_calls:
return "tools"
return "finalize"
def sanitize_observation_node(state: ResearchState) -> dict[str, Any]:
citations, seen = parse_citations_from_tool_messages(state["messages"][-3:])
already_seen = set(state["seen_urls"])
new_seen = [u for u in seen if u and u not in already_seen]
new_citations = [c for c in citations if c["url"] in set(new_seen)]
new_url_count = len(new_seen)
stagnant_turns = state["stagnant_turns"] + 1 if new_url_count == 0 else 0
return {
"citations": new_citations,
"seen_urls": new_seen,
"stagnant_turns": stagnant_turns,
}
def route_after_sanitize(state: ResearchState) -> str:
if state["step_count"] >= state["max_steps"]:
return "finalize"
if state["stagnant_turns"] >= 3:
return "finalize"
return "reason"
async def finalize_node(state: ResearchState, llm: Any) -> dict[str, Any]:
citation_block = "\n".join(
f"- {c['title']} | {c['url']} | {c['snippet'][:180]}"
for c in compact_unique_citations(state["citations"])[:8]
)
prompt = (
f"Topic: {state['topic']}\n\n"
"Write a concise research brief in 8-12 bullets.\n"
"Each bullet should be evidence-backed where possible.\n"
"Then add a short 'Citations' section with URL list only.\n\n"
f"Evidence:\n{citation_block if citation_block else '- No citations collected.'}"
)
response = await llm.ainvoke([HumanMessage(content=prompt)])
return {
"final_brief": str(response.content),
"messages": [AIMessage(content=f"Final brief generated with {len(state['citations'])} citations.")],
}
def build_graph(tools: list[Any], llm: Any):
llm_with_tools = llm.bind_tools(tools)
tool_node = ToolNode(tools)
builder = StateGraph(ResearchState)
async def _reason(state: ResearchState) -> dict[str, Any]:
return await reason_node(state, llm_with_tools)
async def _finalize(state: ResearchState) -> dict[str, Any]:
return await finalize_node(state, llm)
builder.add_node("reason", _reason)
builder.add_node("tools", tool_node)
builder.add_node("sanitize", sanitize_observation_node)
builder.add_node("finalize", _finalize)
builder.add_edge(START, "reason")
builder.add_conditional_edges("reason", route_after_reason, {"tools": "tools", "finalize": "finalize"})
builder.add_edge("tools", "sanitize")
builder.add_conditional_edges("sanitize", route_after_sanitize, {"reason": "reason", "finalize": "finalize"})
builder.add_edge("finalize", END)
return builder
def topic_to_thread_id(topic: str) -> str:
digest = hashlib.sha1(topic.encode("utf-8")).hexdigest()[:10]
return f"research-{digest}"
async def run(topic: str, max_steps: int) -> None:
db_uri = os.getenv("DATABASE_URL", "postgresql://postgres:postgres@localhost:5432/postgres")
model_name = os.getenv("GEMINI_MODEL", "gemini-2.5-flash")
llm = ChatGoogleGenerativeAI(model=model_name, temperature=0.1)
tools = [search_web, fetch_url]
graph = build_graph(tools=tools, llm=llm)
async with AsyncConnectionPool(conninfo=db_uri, max_size=10) as pool:
checkpointer = AsyncPostgresSaver(pool)
await checkpointer.setup()
app = graph.compile(checkpointer=checkpointer)
thread_id = topic_to_thread_id(topic)
config = {"configurable": {"thread_id": thread_id}}
initial: ResearchState = {
"topic": topic,
"messages": [HumanMessage(content=f"Research this topic: {topic}")],
"citations": [],
"seen_urls": [],
"step_count": 0,
"max_steps": max_steps,
"stagnant_turns": 0,
"final_brief": "",
}
result = await app.ainvoke(initial, config=config)
final_brief = result.get("final_brief", "").strip()
citations = compact_unique_citations(result.get("citations", []))
print("\n" + "=" * 70)
print(f"Thread ID: {thread_id}")
print(f"Model: {model_name}")
print(f"Tools loaded: {[getattr(t, 'name', str(t)) for t in tools]}")
print("=" * 70)
print("\nResearch Brief:\n")
print(final_brief or "(No final brief generated.)")
print("\nCitations:\n")
if citations:
for idx, c in enumerate(citations, start=1):
print(f"{idx}. {c['url']}")
else:
print("No citations captured.")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="ReAct research agent (LangGraph + Postgres).")
parser.add_argument(
"--topic",
type=str,
default="Practical use cases of ReAct agents in 2026 and common failure modes",
help="Research topic to investigate.",
)
parser.add_argument(
"--max-steps",
type=int,
default=10,
help="Maximum ReAct loop steps before forced finalization.",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
asyncio.run(run(topic=args.topic, max_steps=args.max_steps))总结