如何构建现代Agent以OpenManus为例
在人工智能快速发展的今天,Agent(智能体)已成为连接大语言模型与实际应用场景的关键桥梁。现代Agent不仅能够理解自然语言指令,更重要的是能够通过工具调用(Tool Calling)主动执行操作,完成复杂的任务。从代码编写、数据分析到网页浏览、文件操作,Agent正在重塑我们与计算机交互的方式。 OpenManus是一个开源的通用AI Agent框架,它展示了如何构建一个功能完整、架构清晰的现代Agent系统。本文将以OpenManus项目为蓝本,系统性地解答现代Agent构建中的四个核心问题: 通过深入分析OpenManus的代码实现,我们将构建对现代Agent架构的完整认知,为构建自己的Agent系统提供实践指导。 现代Agent系统通常采用分层架构,每一层负责不同的职责。OpenManus采用了清晰的四层架构设计: 核心功能: 这种设计将"决策"和"执行"解耦,使得Agent的思考过程更加清晰可控。 定义了Agent系统的核心数据结构: 工具系统是Agent能力的核心扩展点: 所有工具都继承自 以 工具描述的质量直接影响LLM的选择准确性。好的描述应该: 在Agent类定义时,通过 这种方式适合在Agent初始化时就确定可用的工具。 OpenManus支持通过MCP(Model Context Protocol)协议动态连接远程工具服务器: MCP工具的工作流程: 工具必须转换为LLM可理解的格式。 这个工具列表会被传递给LLM,LLM根据工具描述和当前上下文,决定调用哪些工具。 Agent调用工具的过程遵循ReAct模式,分为三个阶段: Agent分析当前状态,决定需要调用哪些工具: LLM接收的信息包括: LLM基于这些信息,生成结构化的工具调用决策。 Agent执行LLM决定的工具调用: 工具执行结果被添加到对话历史,供下一轮思考使用: 完整的执行循环在 每一步都是完整的think-act循环,直到任务完成或达到最大步数。 ReAct(Reasoning + Acting)是现代Agent的核心模式,它将Agent的执行分为三个环节: 这三个环节循环往复,直到任务完成。 这种设计的优势: Agent的思考能力主要依赖于精心设计的提示词: System Prompt:定义Agent的角色和能力边界 Next Step Prompt:指导Agent如何选择工具 工具描述:每个工具的name、description、parameters共同构成LLM决策的依据 LLM的function calling能力使得Agent能够进行结构化决策: LLM的处理过程: LLM返回的格式: Agent的状态转换遵循明确的状态机: 状态转换通过上下文管理器安全控制: 卡死检测机制: 当检测到卡死时,Agent会添加提示词引导改变策略: 某些工具具有特殊语义,如 对话历史包含完整的交互序列: Agent的上下文由三部分构成: 这种动态上下文调整使得Agent能够根据当前任务状态,提供更精准的决策。 设计新工具时,遵循以下原则: 示例:设计一个文件搜索工具 提示词优化是提升Agent性能的关键: 系统提示词: 下一步提示词: 工具描述: 示例:优化后的系统提示词 调试Agent需要系统化的方法: 日志记录:在关键节点添加详细日志 示例:调试工具 Token管理: 工具选择优化: 并发执行: 缓存机制: 示例:工具结果缓存 通过深入分析OpenManus项目,我们总结出现代Agent系统的核心要素: OpenManus的架构设计具有以下优势: 现代Agent技术仍在快速发展,未来可能的方向包括: 构建现代Agent是一个系统工程,需要深入理解LLM能力、工具设计、系统架构等多个方面。OpenManus项目为我们提供了一个优秀的参考实现,展示了如何将理论转化为实践。 通过本文的系统性分析,我们希望读者能够: 现代Agent技术正在快速发展,期待更多开发者加入这个领域,共同推动AI Agent技术的进步。 参考资料:一、引言
二、项目结构与核心代码架构
2.1 分层架构设计
基础层(Base Layer):
app/agent/base.pyBaseAgent是所有Agent的抽象基类,提供了Agent运行的基础设施:class BaseAgent(BaseModel, ABC):
name: str # Agent唯一标识
description: Optional[str] # Agent描述
system_prompt: Optional[str] # 系统级指令
next_step_prompt: Optional[str] # 下一步行动提示
llm: LLM # 大语言模型实例
memory: Memory # 记忆存储
state: AgentState # 当前状态(IDLE/RUNNING/FINISHED/ERROR)
max_steps: int = 10 # 最大执行步数
current_step: int = 0 # 当前步数
state_context上下文管理器实现安全的状态转换update_memory()方法统一管理对话历史run()方法实现主执行循环,包含步数限制和卡死检测async def run(self, request: Optional[str] = None) -> str:
if request:
self.update_memory("user", request)
async with self.state_context(AgentState.RUNNING):
while (self.current_step < self.max_steps and
self.state != AgentState.FINISHED):
self.current_step += 1
step_result = await self.step() # 执行单步
if self.is_stuck(): # 检测是否卡死
self.handle_stuck_state()
思考层(Reasoning Layer):
app/agent/react.pyReActAgent实现了经典的ReAct(Reasoning + Acting)模式,将Agent的执行分为思考和行动两个阶段:class ReActAgent(BaseAgent, ABC):
@abstractmethod
async def think(self) -> bool:
"""处理当前状态并决定下一步行动"""
@abstractmethod
async def act(self) -> str:
"""执行已决定的行动"""
async def step(self) -> str:
"""执行单步:思考然后行动"""
should_act = await self.think()
if not should_act:
return "Thinking complete - no action needed"
return await self.act()
工具调用层(Tool Call Layer):
app/agent/toolcall.pyToolCallAgent在ReAct模式基础上,实现了具体的工具调用机制:class ToolCallAgent(ReActAgent):
available_tools: ToolCollection # 可用工具集合
tool_choices: TOOL_CHOICE_TYPE = ToolChoice.AUTO
tool_calls: List[ToolCall] = Field(default_factory=list)
async def think(self) -> bool:
# 调用LLM,传入工具列表
response = await self.llm.ask_tool(
messages=self.messages,
system_msgs=[Message.system_message(self.system_prompt)],
tools=self.available_tools.to_params(), # 工具列表
tool_choice=self.tool_choices,
)
# 解析LLM返回的工具调用
self.tool_calls = response.tool_calls if response else []
# ...
async def act(self) -> str:
# 执行工具调用
for command in self.tool_calls:
result = await self.execute_tool(command)
# 将结果添加到记忆
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name,
)
self.memory.add_message(tool_msg)
应用层(Application Layer):
app/agent/manus.pyManus是具体的业务Agent实现,配置了实际可用的工具集合:class Manus(ToolCallAgent):
name: str = "Manus"
system_prompt: str = SYSTEM_PROMPT.format(directory=config.workspace_root)
# 配置工具集合
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(), # Python代码执行
BrowserUseTool(), # 浏览器操作
StrReplaceEditor(), # 文件编辑
AskHuman(), # 人工交互
Terminate(), # 终止工具
)
)
2.2 核心代码模块
数据模型:
app/schema.pyclass Message(BaseModel):
role: ROLE_TYPE # user/assistant/system/tool
content: Optional[str]
tool_calls: Optional[List[ToolCall]]
tool_call_id: Optional[str] # 关联工具调用结果
base64_image: Optional[str] # 支持多模态
LLM封装:
app/llm.pyLLM类提供了统一的大模型接口,关键方法:ask_tool():支持function calling的调用方法,接收工具列表并返回工具调用决策async def ask_tool(
self,
messages: List[Union[dict, Message]],
system_msgs: Optional[List[Union[dict, Message]]] = None,
tools: Optional[List[dict]] = None,
tool_choice: TOOL_CHOICE_TYPE = ToolChoice.AUTO,
) -> ChatCompletionMessage:
# 格式化消息
messages = self.format_messages(messages, supports_images)
# 计算token并检查限制
input_tokens = self.count_message_tokens(messages)
if not self.check_token_limit(input_tokens):
raise TokenLimitExceeded(...)
# 调用API
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
tools=tools,
tool_choice=tool_choice,
)
return response.choices[0].message
工具系统:
app/tool/三、工具如何被添加到Agent
3.1 工具的定义与实现
工具基类设计
BaseTool,它定义了工具的标准接口:class BaseTool(ABC, BaseModel):
name: str # 工具名称,必须唯一
description: str # 工具描述,LLM据此决定是否使用
parameters: Optional[dict] # JSON Schema格式的参数定义
@abstractmethod
async def execute(self, **kwargs) -> Any:
"""工具执行逻辑,子类必须实现"""
pass
def to_param(self) -> Dict:
"""转换为OpenAI function calling格式"""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
工具实现示例
PythonExecute工具为例:class PythonExecute(BaseTool):
name: str = "python_execute"
description: str = "Executes Python code string..."
parameters: dict = {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute.",
},
},
"required": ["code"],
}
async def execute(self, code: str, timeout: int = 5) -> Dict:
"""执行Python代码"""
# 使用多进程执行,支持超时控制
with multiprocessing.Manager() as manager:
result = manager.dict({"observation": "", "success": False})
proc = multiprocessing.Process(
target=self._run_code, args=(code, result, safe_globals)
)
proc.start()
proc.join(timeout)
# ...
return dict(result)
3.2 Agent中的工具配置
静态工具配置
Field(default_factory=...)配置工具集合:class Manus(ToolCallAgent):
available_tools: ToolCollection = Field(
default_factory=lambda: ToolCollection(
PythonExecute(),
BrowserUseTool(),
StrReplaceEditor(),
AskHuman(),
Terminate(),
)
)
动态工具添加
ToolCollection提供了动态添加工具的方法:class ToolCollection:
def __init__(self, *tools: BaseTool):
self.tools = tools
self.tool_map = {tool.name: tool for tool in tools}
def add_tool(self, tool: BaseTool):
"""添加单个工具"""
if tool.name in self.tool_map:
logger.warning(f"Tool {tool.name} already exists, skipping")
return self
self.tools += (tool,)
self.tool_map[tool.name] = tool
return self
def add_tools(self, *tools: BaseTool):
"""批量添加工具"""
for tool in tools:
self.add_tool(tool)
return self
MCP工具的动态添加
class Manus(ToolCallAgent):
mcp_clients: MCPClients = Field(default_factory=MCPClients)
async def connect_mcp_server(
self, server_url: str, server_id: str = "", use_stdio: bool = False
) -> None:
"""连接MCP服务器并添加其工具"""
if use_stdio:
await self.mcp_clients.connect_stdio(server_url, [], server_id)
else:
await self.mcp_clients.connect_sse(server_url, server_id)
# 获取新工具并添加到可用工具集合
new_tools = [
tool for tool in self.mcp_clients.tools
if tool.server_id == server_id
]
self.available_tools.add_tools(*new_tools)
list_tools()获取服务器提供的工具列表MCPClientTool代理class MCPClientTool(BaseTool):
"""MCP工具的代理,执行时调用远程服务器"""
session: Optional[ClientSession] = None
server_id: str = ""
original_name: str = ""
async def execute(self, **kwargs) -> ToolResult:
"""通过MCP协议调用远程工具"""
result = await self.session.call_tool(self.original_name, kwargs)
return ToolResult(output=result.content)
3.3 工具Schema转换
to_param()方法将工具转换为OpenAI function calling格式:def to_param(self) -> Dict:
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters, # JSON Schema格式
},
}
ToolCollection.to_params()将所有工具转换为列表:def to_params(self) -> List[Dict[str, Any]]:
return [tool.to_param() for tool in self.tools]
四、Agent如何调用这些工具
4.1 工具调用的完整流程
Step 1: 思考阶段(think)
async def think(self) -> bool:
# 1. 添加下一步提示到消息历史
if self.next_step_prompt:
user_msg = Message.user_message(self.next_step_prompt)
self.messages += [user_msg]
# 2. 调用LLM,传入工具列表
response = await self.llm.ask_tool(
messages=self.messages, # 对话历史
system_msgs=[Message.system_message(self.system_prompt)],
tools=self.available_tools.to_params(), # 工具列表
tool_choice=self.tool_choices, # AUTO/REQUIRED/NONE
)
# 3. 解析LLM返回的工具调用
self.tool_calls = response.tool_calls if response else []
content = response.content if response else ""
# 4. 创建Assistant消息并添加到记忆
assistant_msg = Message.from_tool_calls(
content=content, tool_calls=self.tool_calls
)
self.memory.add_message(assistant_msg)
return bool(self.tool_calls)
Step 2: 执行阶段(act)
async def act(self) -> str:
if not self.tool_calls:
return self.messages[-1].content or "No action to execute"
results = []
for command in self.tool_calls:
# 执行单个工具调用
result = await self.execute_tool(command)
# 将结果封装为ToolMessage
tool_msg = Message.tool_message(
content=result,
tool_call_id=command.id,
name=command.function.name,
)
self.memory.add_message(tool_msg)
results.append(result)
return "\\n\\n".join(results)
Step 3: 结果反馈
async def execute_tool(self, command: ToolCall) -> str:
name = command.function.name
# 1. 查找工具实例
if name not in self.available_tools.tool_map:
return f"Error: Unknown tool '{name}'"
# 2. 解析参数
args = json.loads(command.function.arguments or "{}")
# 3. 执行工具
result = await self.available_tools.execute(name=name, tool_input=args)
# 4. 处理特殊工具(如Terminate)
await self._handle_special_tool(name=name, result=result)
# 5. 格式化结果
observation = f"Observed output of cmd `{name}` executed:\\n{str(result)}"
return observation
4.2 核心代码流程
ToolCollection.execute():工具执行入口
async def execute(
self, *, name: str, tool_input: Dict[str, Any] = None
) -> ToolResult:
# 1. 根据名称查找工具
tool = self.tool_map.get(name)
if not tool:
return ToolFailure(error=f"Tool {name} is invalid")
try:
# 2. 调用工具的execute方法
result = await tool(**tool_input)
return result
except ToolError as e:
return ToolFailure(error=e.message)
工具选择策略
tool_choice参数控制LLM的工具选择行为:if self.tool_choices == ToolChoice.REQUIRED and not self.tool_calls:
# 要求调用工具但LLM没有返回,可能需要重试
return True
if self.tool_choices == ToolChoice.AUTO and not self.tool_calls:
# 自动模式,如果没有工具调用但有文本内容,继续
return bool(content)
4.3 执行循环
BaseAgent.run()中实现:async def run(self, request: Optional[str] = None) -> str:
if request:
self.update_memory("user", request)
results: List[str] = []
async with self.state_context(AgentState.RUNNING):
while (
self.current_step < self.max_steps and
self.state != AgentState.FINISHED
):
self.current_step += 1
# 执行单步:think -> act
step_result = await self.step()
# 检测是否卡死
if self.is_stuck():
self.handle_stuck_state()
results.append(f"Step {self.current_step}: {step_result}")
return "\\n".join(results)
五、Agent是如何思考的
5.1 ReAct模式:推理与行动循环
实现机制
async def step(self) -> str:
should_act = await self.think() # 思考:分析并决策
if not should_act:
return "Thinking complete - no action needed"
return await self.act() # 行动:执行工具
5.2 LLM驱动的决策机制
提示词工程
SYSTEM_PROMPT = (
"You are OpenManus, an all-capable AI assistant, aimed at solving any task "
"presented by the user. You have various tools at your disposal that you can "
"call upon to efficiently complete complex requests. Whether it's programming, "
"information retrieval, file processing, web browsing, or human interaction "
"(only for extreme cases), you can handle it all."
"The initial directory is: {directory}"
)
NEXT_STEP_PROMPT = """
Based on user needs, proactively select the most appropriate tool or combination
of tools. For complex tasks, you can break down the problem and use different tools
step by step to solve it. After using each tool, clearly explain the execution
results and suggest the next steps.
If you want to stop the interaction at any point, use the `terminate` tool/function call.
"""
Function Calling机制
response = await self.llm.ask_tool(
messages=self.messages, # 完整的对话历史
system_msgs=[Message.system_message(self.system_prompt)],
tools=self.available_tools.to_params(), # 工具schema列表
tool_choice=self.tool_choices, # 选择策略
)
{
"content": "我需要先查看文件内容,然后进行编辑", # 思考过程
"tool_calls": [
{
"id": "call_abc123",
"type": "function",
"function": {
"name": "str_replace_editor",
"arguments": '{"command": "view", "path": "/path/to/file"}'
}
}
]
}
5.3 状态管理与循环控制
Agent状态机
class AgentState(str, Enum):
IDLE = "IDLE" # 空闲,等待任务
RUNNING = "RUNNING" # 执行中
FINISHED = "FINISHED" # 任务完成
ERROR = "ERROR" # 发生错误
@asynccontextmanager
async def state_context(self, new_state: AgentState):
previous_state = self.state
self.state = new_state
try:
yield
except Exception as e:
self.state = AgentState.ERROR
raise e
finally:
self.state = previous_state
执行循环控制
while (
self.current_step < self.max_steps and
self.state != AgentState.FINISHED
):
self.current_step += 1
step_result = await self.step()
# 卡死检测
if self.is_stuck():
self.handle_stuck_state()
def is_stuck(self) -> bool:
"""检测Agent是否陷入循环"""
if len(self.memory.messages) < 2:
return False
last_message = self.memory.messages[-1]
if not last_message.content:
return False
# 检查是否有重复的assistant消息
duplicate_count = sum(
1 for msg in reversed(self.memory.messages[:-1])
if msg.role == "assistant" and msg.content == last_message.content
)
return duplicate_count >= self.duplicate_threshold
def handle_stuck_state(self):
stuck_prompt = (
"Observed duplicate responses. Consider new strategies and avoid "
"repeating ineffective paths already attempted."
)
self.next_step_prompt = f"{stuck_prompt}\\n{self.next_step_prompt}"
特殊工具处理
Terminate工具会终止Agent执行:async def _handle_special_tool(self, name: str, result: Any, **kwargs):
if not self._is_special_tool(name):
return
if self._should_finish_execution(name=name, result=result, **kwargs):
logger.info(f"Special tool '{name}' has completed the task!")
self.state = AgentState.FINISHED
5.4 上下文感知与记忆管理
Memory机制
Memory类维护完整的对话历史:class Memory(BaseModel):
messages: List[Message] = Field(default_factory=list)
max_messages: int = Field(default=100)
def add_message(self, message: Message) -> None:
self.messages.append(message)
# 限制消息数量,保留最近的
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages:]
User: "帮我创建一个Python脚本"
Assistant: [思考过程] [tool_calls: python_execute]
Tool: [执行结果]
Assistant: [分析结果] [tool_calls: str_replace_editor]
Tool: [文件创建结果]
Assistant: "脚本已创建完成"
上下文构建
async def think(self) -> bool:
# 检查是否在使用浏览器
browser_in_use = any(
tc.function.name == BrowserUseTool().name
for msg in recent_messages
if msg.tool_calls
for tc in msg.tool_calls
)
# 如果使用浏览器,添加浏览器上下文
if browser_in_use:
self.next_step_prompt = (
await self.browser_context_helper.format_next_step_prompt()
)
return await super().think()
六、架构图与数据流
6.1 Agent执行流程图
6.2 工具调用序列图
6.3 类继承关系图
6.4 数据流图
七、实践建议
7.1 如何设计新工具
description字段应该详细说明工具的用途、使用场景和限制ToolResult,包含成功结果或错误信息class FileSearchTool(BaseTool):
name: str = "file_search"
description: str = (
"Search for files in a directory tree matching a pattern. "
"Supports glob patterns and regex. Returns list of matching file paths."
)
parameters: dict = {
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "Root directory to search in (absolute path)",
},
"pattern": {
"type": "string",
"description": "Search pattern (glob or regex)",
},
"recursive": {
"type": "boolean",
"description": "Whether to search recursively",
"default": True,
},
},
"required": ["directory", "pattern"],
}
async def execute(
self, directory: str, pattern: str, recursive: bool = True
) -> ToolResult:
try:
# 验证路径安全性
if not Path(directory).is_absolute():
return self.fail_response("Directory must be absolute path")
# 执行搜索
matches = await self._search_files(directory, pattern, recursive)
return self.success_response({"files": matches})
except Exception as e:
return self.fail_response(f"Search failed: {str(e)}")
7.2 如何优化提示词
SYSTEM_PROMPT = """You are OpenManus, a capable AI assistant.
Your capabilities:
- Execute Python code for data processing and analysis
- Browse the web to gather information
- Edit files using safe string replacement
- Interact with users when clarification is needed
Working directory: {directory}
Important guidelines:
- Always verify file paths before operations
- Use sandboxed execution for untrusted code
- Ask for confirmation before destructive operations
- Explain your reasoning at each step
"""
7.3 如何调试Agent行为
agent.memory.messages,验证对话历史的正确性max_steps=1限制,逐步观察Agent的行为async def debug_agent(agent: ToolCallAgent, request: str):
"""调试Agent执行过程"""
print(f"Request: {request}")
print(f"Available tools: {[t.name for t in agent.available_tools.tools]}")
# 单步执行
agent.max_steps = 1
await agent.run(request)
# 检查记忆
print("\\nMemory contents:")
for i, msg in enumerate(agent.memory.messages):
print(f"{i}. {msg.role}: {msg.content[:100]}")
if msg.tool_calls:
print(f" Tool calls: {[tc.function.name for tc in msg.tool_calls]}")
7.4 性能优化建议
from functools import lru_cache
from datetime import datetime, timedelta
class CachedTool(BaseTool):
_cache: Dict[str, Tuple[Any, datetime]] = {}
_cache_ttl: timedelta = timedelta(minutes=5)
async def execute(self, **kwargs) -> ToolResult:
cache_key = str(sorted(kwargs.items()))
# 检查缓存
if cache_key in self._cache:
result, timestamp = self._cache[cache_key]
if datetime.now() - timestamp < self._cache_ttl:
return result
# 执行工具
result = await self._execute_impl(**kwargs)
# 更新缓存
self._cache[cache_key] = (result, datetime.now())
return result
八、总结
8.1 现代Agent的核心要素
8.2 OpenManus架构的优势
8.3 未来发展方向
8.4 结语