1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
| import os
import asyncio
from typing import List, Optional, Dict, Type
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import Tool
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.memory import ConversationBufferMemory
# --- 0. 配置与初始化 ---
# 从 .env 文件加载 OPENAI_API_KEY。
load_dotenv()
# ChatOpenAI 客户端自动读取环境变量中的 API key。
llm = ChatOpenAI(temperature=0.5, model="gpt-4o-mini")
# --- 1. 任务管理系统 ---
class Task(BaseModel):
"""系统中的单个任务。"""
id: str
description: str
priority: Optional[str] = None # P0, P1, P2
assigned_to: Optional[str] = None # 工作人员姓名
class SuperSimpleTaskManager:
"""高效且健壮的内存任务管理器。"""
def __init__(self):
# 使用字典实现 O(1) 查找、更新和删除。
self.tasks: Dict[str, Task] = {}
self.next_task_id = 1
def create_task(self, description: str) -> Task:
"""创建并存储新任务。"""
task_id = f"TASK-{self.next_task_id:03d}"
new_task = Task(id=task_id, description=description)
self.tasks[task_id] = new_task
self.next_task_id += 1
print(f"DEBUG: 创建任务 - {task_id}: {description}")
return new_task
def update_task(self, task_id: str, **kwargs) -> Optional[Task]:
"""使用 Pydantic 的 model_copy 安全更新任务。"""
task = self.tasks.get(task_id)
if task:
update_data = {k: v for k, v in kwargs.items() if v is not None}
updated_task = task.model_copy(update=update_data)
self.tasks[task_id] = updated_task
print(f"DEBUG: 任务 {task_id} 更新为 {update_data}")
return updated_task
print(f"DEBUG: 未找到任务 {task_id},无法更新。")
return None
def list_all_tasks(self) -> str:
"""列出系统中的所有任务。"""
if not self.tasks:
return "系统中暂无任务。"
task_strings = []
for task in self.tasks.values():
task_strings.append(
f"ID: {task.id}, 描述:'{task.description}', "
f"优先级:{task.priority or 'N/A'}, "
f"分配给:{task.assigned_to or 'N/A'}"
)
return "当前任务列表:\n" + "\n".join(task_strings)
task_manager = SuperSimpleTaskManager()
# --- 2. 项目经理 Agent 工具 ---
# 使用 Pydantic 模型定义工具参数,提升校验和可读性。
class CreateTaskArgs(BaseModel):
description: str = Field(description="任务的详细描述。")
class PriorityArgs(BaseModel):
task_id: str = Field(description="要更新的任务 ID,例如 'TASK-001'。")
priority: str = Field(description="优先级,必须为 'P0'、'P1' 或 'P2'。")
class AssignWorkerArgs(BaseModel):
task_id: str = Field(description="要更新的任务 ID,例如 'TASK-001'。")
worker_name: str = Field(description="分配任务的工作人员姓名。")
def create_new_task_tool(description: str) -> str:
"""根据描述创建新项目任务。"""
task = task_manager.create_task(description)
return f"已创建任务 {task.id}: '{task.description}'。"
def assign_priority_to_task_tool(task_id: str, priority: str) -> str:
"""为指定任务分配优先级(P0、P1、P2)。"""
if priority not in ["P0", "P1", "P2"]:
return "优先级无效,必须为 P0、P1 或 P2。"
task = task_manager.update_task(task_id, priority=priority)
return f"已为任务 {task.id} 分配优先级 {priority}。" if task else f"未找到任务 {task_id}。"
def assign_task_to_worker_tool(task_id: str, worker_name: str) -> str:
"""将任务分配给指定工作人员。"""
task = task_manager.update_task(task_id, assigned_to=worker_name)
return f"已将任务 {task.id} 分配给 {worker_name}。" if task else f"未找到任务 {task_id}。"
# 项目经理 Agent 可用的所有工具
pm_tools = [
Tool(
name="create_new_task",
func=create_new_task_tool,
description="首先用于创建新任务并获取任务 ID。",
args_schema=CreateTaskArgs
),
Tool(
name="assign_priority_to_task",
func=assign_priority_to_task_tool,
description="任务创建后用于分配优先级。",
args_schema=PriorityArgs
),
Tool(
name="assign_task_to_worker",
func=assign_task_to_worker_tool,
description="任务创建后用于分配给指定工作人员。",
args_schema=AssignWorkerArgs
),
Tool(
name="list_all_tasks",
func=task_manager.list_all_tasks,
description="用于列出所有当前任务及状态。"
),
]
# --- 3. 项目经理 Agent 定义 ---
pm_prompt_template = ChatPromptTemplate.from_messages([
("system", """你是一名专注的项目经理 LLM Agent,目标是高效管理项目任务。
当收到新任务请求时,请按以下步骤操作:
1. 首先使用 `create_new_task` 工具创建任务并获取 `task_id`。
2. 分析用户请求,判断是否提及优先级或分配人员。
- 如果提到优先级(如“紧急”、“ASAP”、“关键”),映射为 P0,使用 `assign_priority_to_task`。
- 如果提到工作人员,则使用 `assign_task_to_worker`。
3. 如信息(优先级、分配人员)缺失,需合理默认分配(如优先级设为 P1,分配给 'Worker A')。
4. 任务处理完毕后,使用 `list_all_tasks` 展示最终状态。
可用工作人员:'Worker A'、'Worker B'、'Review Team'
优先级:P0(最高)、P1(中)、P2(最低)
"""),
("placeholder", "{chat_history}"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}")
])
# 创建 Agent 执行器
pm_agent = create_react_agent(llm, pm_tools, pm_prompt_template)
pm_agent_executor = AgentExecutor(
agent=pm_agent,
tools=pm_tools,
verbose=True,
handle_parsing_errors=True,
memory=ConversationBufferMemory(memory_key="chat_history", return_messages=True)
)
# --- 4. 简单交互流程 ---
async def run_simulation():
print("--- 项目经理 Agent 模拟 ---")
# 场景 1:处理紧急新功能请求
print("\n[用户请求] 需要 ASAP 实现新的登录系统,分配给 Worker B。")
await pm_agent_executor.ainvoke({"input": "创建一个实现新登录系统的任务。很紧急,分配给 Worker B。"})
print("\n" + "-"*60 + "\n")
# 场景 2:处理不太紧急的内容更新请求
print("[用户请求] 需要审核营销网站内容。")
await pm_agent_executor.ainvoke({"input": "管理一个新任务:审核营销网站内容。"})
print("\n--- 模拟结束 ---")
# 运行模拟
if __name__ == "__main__":
asyncio.run(run_simulation())
|