AI项目实战(11)多智能体研究分析平台

《AI Agent 实战》系列 · 多智能体研究分析平台

Posted by Ryan on 2026-07-03
Estimated Reading Time 49 Minutes
Words 11.6k In Total
Viewed Times

主讲能力:多研究员并行、观点辩论、综合报告
业务场景:用户提出研究问题,多个 Agent 从技术、商业、用户三个视角独立分析,再由辩论 Agent 找出共识与分歧,最后生成结构化研究报告。
对应代码backend/projects/p08_research_analysis/


开篇:一场吵不出结果的产品会

周二下午,三楼会议室。CEO 把一杯咖啡往桌上一放:“我们到底要不要做 AI 视频生成?给个结论。”

技术负责人老周第一个开口:“技术上没问题。扩散模型已经成熟,开源方案一大堆,给我三个月,demo 能跑。”

话音没落,商业总监 Lisa 翻了个白眼:“技术是没问题,可市场呢?头部那几家烧了几亿美金,我们这点预算冲进去,连个水花都砸不出来。”

用研的小陈慢吞吞补了一刀:“我上周访谈了 20 个创作者,他们要的是‘省时间’,不是‘炫技’。现有工具够用了,付费意愿很弱。”

三个人,三个方向,谁也说服不了谁。会开了两小时,结论是:下周再议。

散会后,CEO 拉住你叹气:“要是能找个分析师给我个痛快话就好了。”

可问题恰恰在这里——如果只找一个分析师,他大概率会端出一个“平均答案”:技术可行,但市场有风险,用户需求中等,建议谨慎推进。 听着四平八稳,其实什么也没说。技术可行性有多高?市场风险大到什么程度?用户需求“中等”又是什么鬼?

真正有价值的,不是这个和稀泥的结论,而是刚才会议室里那种张力本身——三个视角各自的最大化表达,以及它们相互碰撞后暴露出来的矛盾。于是你有了主意:与其让一个分析师去调和,不如让多个研究员 Agent 各自独立研究,再把它们拉到一起辩论,最后综合成一份既保留张力、又给出判断的报告

这就是本章的项目八。

💡 本章灵魂:多视角对抗,而非流水线协作
第 9 章的 Agent 是“接力赛”——上一个把棒交给下一个,彼此信任、各司其职。本章的 Agent 是“辩论赛”——谁也不轻信谁,非得把矛盾摆到台面上,辩一辩、掂一掂,才肯下结论。
好的决策不是给一个答案,而是让不同视角充分表达,再形成可解释的综合判断。


10.1 那场吵翻的会

会议室里的那场僵局,本质上是一个“多视角决策”问题。复杂问题很少只有一个正确答案,关键也不在于谁对谁错,而在于不同视角是否都被充分表达了。把这套逻辑搬到一个研究分析平台上,需求就清晰了:

本项目构建一个研究分析平台,让多个研究员 Agent 独立产出观点,再由辩论和综合环节形成结论。

功能需求

  • FR-1:支持用户输入开放式研究问题。
  • FR-2:技术、商业、用户三个研究员并行分析。
  • FR-3:辩论节点找出矛盾、共识、证据不足之处。
  • FR-4:综合节点输出决策建议、风险清单、下一步行动。
  • FR-5:保留每个研究员原始输出,便于追溯。

10.2 画个样子:它该长啥样

需求清楚了,流程怎么走?一张图胜过千言万语:

%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
graph TD
    Q["研究问题"] --> Tech["技术研究员"]
    Q --> Biz["商业研究员"]
    Q --> User["用户研究员"]
    Tech --> Debate["辩论 Agent"]
    Biz --> Debate
    User --> Debate
    Debate --> Synth["综合 Agent"]
    Synth --> Report["研究报告"]

    classDef r fill:#e0f7fa,stroke:#00acc1,stroke-width:2px,color:#006064
    classDef d fill:#ede7f6,stroke:#5e35b1,stroke-width:2px,color:#311b92
    classDef o fill:#e8f5e9,stroke:#43a047,stroke-width:2px,color:#1b5e20
    class Tech,Biz,User r
    class Debate,Synth d
    class Report o

三个研究员同时接到同一个问题,各自闷头研究;研究完不急着下结论,先交给辩论 Agent 把共识、分歧、盲区扒出来;最后综合 Agent 拿着这些“辩过”的材料,写出一份报告。

注意这条流水线的形状:前面是扇出(一个问题分给三个人),中间是收束(三份报告汇成一个辩论),最后再收成一份结论。这种“先散后收”的形状,正是多视角对抗的物理结构——没有前面的散,就没有后面的真;没有中间的收,散就只是一盘散沙。


10.3 拆开看:怎么造出来

研究员怎么落地?这里有个关键选择:是用三个独立 Agent,还是用“工具化研究员”?本项目选了后者——每个研究员是一个工具函数,内部调用模型完成特定视角的分析,结果存入 ResearchStore(生产环境走 ResearchRepository 落 SQLite)。

为什么要“工具化”?因为研究员的本质不是“自主决策”,而是“按固定框架执行一次分析”。把它做成工具,既保留了视角的独立性(每个工具有自己的 Prompt),又把调度权交给首席研究员 Agent,让它决定何时调谁。这种模式有三个好处:

  • 每个视角 Prompt 独立,职责清晰,互不污染。
  • 原始研究结果可追踪,结论追溯到视角、视角追溯到 Prompt。
  • 后续可平滑替换为真实 Web Search / RAG / 数据库分析工具,服务层一行不用改。

⚠️ 避坑:本书代码的研究员目前只调模型,没接真实数据源。 也就是说,商业研究员嘴里的“TAM/SAM/SOM”、技术研究员嘴里的“成功率/研发周期”,都是模型凭训练知识“编”的,不是实时检索来的。生产版本必须接入真实搜索、论文数据库、商业数据库并标注引用来源,否则报告再漂亮也是空中楼阁。本书当前代码刻意保留了接口边界,正是为了这一天。


10.4 动手写:三层架构完整代码

理论说够了,上代码。本节给出多智能体研究分析平台的完整实现。项目采用分层架构——说白了就是把“定义数据”“写 Prompt”“跑工具”“存数据”“编排流程”“对外暴露”这几件事分别关进不同的房间,谁也别越界。这样每层都能单独演进、单独测试。

10.4.1 三层架构完整代码

项目按职责划分为以下层次,每一层都对应一个独立文件:

层次 文件 职责
模型层 models.py 研究角色、研究发现、辩论结果、报告模型
提示词层 prompts.py 各研究员系统提示词
工具层 tools.py 研究/辩论/综合工具函数
仓储层 repositories.py SQLite 持久化存储
服务层 service.py 并行研究、辩论、综合工作流编排
项目层 project.py 项目注册、对外接口
入口 init.py 注册与 re-export

下面按层次依次给出每个文件的完整代码。读的时候不妨带着一个问题:如果要把“顺序研究”升级成“真并行”,或者把“调模型”换成“调搜索引擎”,分别该动哪一层? 答案就藏在分层里。

models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""项目八:数据模型层。

定义多智能体研究分析平台的领域模型和数据结构。
"""
from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any


class ResearchRole(str, Enum):
"""研究员角色枚举。"""
TECHNICAL = "technical" # 技术视角
BUSINESS = "business" # 商业视角
USER = "user" # 用户视角
DEBATER = "debater" # 辩论主持人
SYNTHESIZER = "synthesizer" # 综合报告撰写人


class ResearchStatus(str, Enum):
"""研究状态枚举。"""
PENDING = "pending"
RESEARCHING = "researching"
DEBATING = "debating"
SYNTHESIZING = "synthesizing"
COMPLETED = "completed"
FAILED = "failed"


@dataclass
class ResearchFinding:
"""单个研究员的研究发现。"""
role: ResearchRole
perspective: str # 视角名称,如"技术可行性"、"市场前景"
content: str
confidence: float = 0.8 # 置信度 0-1
created_at: str = field(default_factory=lambda: datetime.now().isoformat())

def summary(self, max_length: int = 200) -> str:
"""获取摘要。"""
if len(self.content) <= max_length:
return self.content
return self.content[:max_length] + "..."


@dataclass
class DebateResult:
"""辩论结果。"""
consensus_points: list[str] = field(default_factory=list)
contradiction_points: list[str] = field(default_factory=list)
blind_spots: list[str] = field(default_factory=list)
unique_contributions: dict[str, str] = field(default_factory=dict)
overall_assessment: str = ""

def to_markdown(self) -> str:
"""转换为 Markdown 格式。"""
lines = ["# 辩论结果", ""]

lines.extend(["## 共识点", ""])
for point in self.consensus_points:
lines.append(f"- {point}")

lines.extend(["", "## 分歧点", ""])
for point in self.contradiction_points:
lines.append(f"- {point}")

lines.extend(["", "## 研究盲区", ""])
for point in self.blind_spots:
lines.append(f"- {point}")

lines.extend(["", "## 各方独特贡献", ""])
for role, contribution in self.unique_contributions.items():
lines.append(f"- **{role}**: {contribution}")

lines.extend(["", "## 总体评价", "", self.overall_assessment])

return "\n".join(lines)


@dataclass
class FinalReport:
"""最终研究报告。"""
title: str
executive_summary: str
background: str
multi_perspective_analysis: str
consensus_and_debate: str
conclusions: list[str] = field(default_factory=list)
recommendations: list[str] = field(default_factory=list)
limitations: list[str] = field(default_factory=list)

def to_markdown(self) -> str:
"""转换为完整 Markdown 报告。"""
lines = [f"# {self.title}", ""]

lines.extend(["## 执行摘要", "", self.executive_summary])
lines.extend(["", "## 研究背景", "", self.background])
lines.extend(["", "## 多视角分析", "", self.multi_perspective_analysis])
lines.extend(["", "## 共识与分歧", "", self.consensus_and_debate])

lines.extend(["", "## 研究结论", ""])
for conclusion in self.conclusions:
lines.append(f"- {conclusion}")

lines.extend(["", "## 建议", ""])
for recommendation in self.recommendations:
lines.append(f"- {recommendation}")

lines.extend(["", "## 局限性", ""])
for limitation in self.limitations:
lines.append(f"- {limitation}")

return "\n".join(lines)


@dataclass
class ResearchTask:
"""研究任务。"""
task_id: str
question: str
status: ResearchStatus = ResearchStatus.PENDING
findings: list[ResearchFinding] = field(default_factory=list)
debate_result: DebateResult | None = None
final_report: FinalReport | None = None
started_at: str = field(default_factory=lambda: datetime.now().isoformat())
completed_at: str = ""

def add_finding(self, role: ResearchRole, perspective: str, content: str) -> None:
"""添加研究发现。"""
self.findings.append(ResearchFinding(
role=role,
perspective=perspective,
content=content,
))

def get_findings_by_role(self, role: ResearchRole) -> ResearchFinding | None:
"""按角色获取研究发现。"""
for finding in self.findings:
if finding.role == role:
return finding
return None

def all_findings_text(self) -> str:
"""获取所有研究发现的合并文本。"""
if not self.findings:
return "尚无研究结果。"

parts = []
for finding in self.findings:
parts.append(f"=== {finding.perspective} ===\n{finding.content}")

return "\n\n".join(parts)

def mark_completed(self) -> None:
"""标记任务完成。"""
self.status = ResearchStatus.COMPLETED
self.completed_at = datetime.now().isoformat()

prompts.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""项目八:Prompt 层。

定义多智能体研究分析平台各研究员的系统提示词。
"""
from __future__ import annotations

# 首席研究员(总控)系统提示词
LEAD_RESEARCHER_PROMPT = """你是研究分析平台的首席研究员,负责协调研究团队完成深度分析。

## 你的团队
- 🔬 research_technical — 技术视角研究员
- 💼 research_business — 商业视角研究员
- 👤 research_user — 用户视角研究员
- ⚖️ debate_findings — 辩论主持人
- 📊 synthesize_report — 综合报告撰写

## 工作流程(严格执行)
1. **并行研究阶段**:同时调用三个研究员工具,从不同视角分析同一问题
2. **辩论阶段**:调用 debate_findings 对比各方观点,找出共识与分歧
3. **综合阶段**:调用 synthesize_report 生成最终研究报告

## 报告要求
- 结构化清晰,有明确的章节标题
- 每个观点注明来源(哪个视角的研究员)
- 数据支撑充分,避免空泛论述
- 结论明确,建议可执行
- 客观中立,不偏不倚

## 最终交付格式
```markdown
# 研究报告:[问题]

## 执行摘要
(200字以内的核心发现和建议)

## 多视角分析
### 技术视角
(内容)

### 商业视角
(内容)

### 用户视角
(内容)

## 共识与分歧
### 各方共识
- (共识点)

### 主要分歧
- (分歧点)

## 结论与建议
### 核心结论
- (结论)

### 行动建议
- (建议)
```"""


# 技术研究员提示词
TECHNICAL_RESEARCHER_PROMPT = """你是资深技术研究员,擅长从技术可行性、架构设计、实现难度角度分析问题。

## 分析框架
1. **技术可行性评估**:现有技术是否能支撑?技术成熟度如何?
2. **架构设计考量**:需要什么样的系统架构?关键组件有哪些?
3. **实现难度分析**:研发周期、团队要求、技术难点在哪里?
4. **技术风险识别**:可能遇到哪些技术挑战?如何应对?
5. **替代方案比较**:是否有其他技术路线?各有什么优缺点?

## 输出要求
- 给出具体的技术评估指标(如成功率、研发周期等)
- 用数据支撑观点,避免空泛判断
- 风险等级用高/中/低标注,并说明理由
- 结论明确,有具体的建议

请分析以下问题:
{question}"""


# 商业研究员提示词
BUSINESS_RESEARCHER_PROMPT = """你是资深商业研究员,擅长从市场、商业模式、成本收益角度分析问题。

## 分析框架
1. **市场规模与增长**:目标市场多大?增长趋势如何?
2. **商业模式分析**:如何盈利?盈利周期多长?
3. **成本收益评估**:投入产出比如何?回收期多长?
4. **竞争格局分析**:主要竞争对手是谁?我们的差异化优势?
5. **商业风险识别**:政策风险、市场风险、财务风险等

## 输出要求
- 给出具体的市场数据(TAM/SAM/SOM)
- 商业模式图用文字清晰描述
- 成本收益用具体数字估算
- 竞争分析用 SWOT 结构
- 投资建议明确:建议/谨慎/不建议

请分析以下问题:
{question}"""


# 用户研究员提示词
USER_RESEARCHER_PROMPT = """你是资深用户研究员,擅长从用户需求、使用场景、痛点角度分析问题。

## 分析框架
1. **目标用户画像**:谁是核心用户?他们的特征是什么?
2. **用户需求分析**:用户真正需要什么?表层需求 vs 深层需求?
3. **使用场景分析**:用户在什么场景下使用?频率、时长如何?
4. **痛点与爽点**:当前解决方案有什么问题?我们能提供什么惊喜?
5. **用户体验要求**:用户对产品体验有什么期望?

## 输出要求
- 用户画像具体可感(有姓名、年龄、职业、痛点)
- 用用户故事(User Story)描述场景
- 痛点按严重程度排序
- 给出具体的用户体验指标建议
- 从用户视角评价方案的吸引力

请分析以下问题:
{question}"""


# 辩论主持人提示词
DEBATER_PROMPT = """你是辩论主持人,负责对比各方研究结果,找出矛盾、共识与盲区。

## 已完成的研究结果
{all_findings}

## 辩论框架
1. **共识点识别**:各方都同意的观点有哪些?
2. **矛盾点分析**:哪些地方各方观点不一致?为什么?
3. **独特贡献总结**:每个视角带来了什么独特价值?
4. **研究盲区发现**:有哪些重要问题还没有被覆盖?
5. **总体评估**:综合来看,研究质量如何?还需要补充什么?

## 输出要求
- 矛盾点要具体指出哪两方在什么问题上观点不同
- 每个盲区要说明为什么重要以及建议如何补充研究
- 语言客观中立,不站队
- 结构清晰,便于后续写报告

请给出你的辩论分析结果。"""


# 综合报告撰写人提示词
SYNTHESIZER_PROMPT = """你是首席报告撰写人,负责基于研究结果和辩论,生成高质量的综合研究报告。

## 输入材料
### 研究结果
{all_findings}

### 辩论结果
{debate_result}

## 报告结构要求
1. **标题**:简洁醒目,概括研究主题
2. **执行摘要**:200字以内,核心发现 + 关键建议
3. **研究背景**:为什么要做这个研究?研究目标是什么?
4. **多视角分析**:整合三个研究员的发现,去重存精
5. **共识与分歧**:基于辩论结果,说明各方共识和主要分歧
6. **核心结论**:3-5条明确的结论
7. **行动建议**:3-5条可执行的建议,分优先级
8. **局限性说明**:本研究的局限和未来研究方向

## 写作要求
- 语言专业、客观、简练
- 每个重要观点注明来源(哪个视角的研究员)
- 结论要有充分论据支撑
- 建议要具体、可落地、分优先级
- 格式美观,使用 Markdown 标题层级

请撰写这份研究报告。"""

tools.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""项目八:工具层。

定义多智能体研究分析平台各研究员的工具函数。
"""
from __future__ import annotations

from typing import Any

from langchain.tools import tool

from core import build_chat_model
from core.logging_conf import get_logger

from .models import ResearchRole
from .prompts import (
BUSINESS_RESEARCHER_PROMPT,
DEBATER_PROMPT,
SYNTHESIZER_PROMPT,
TECHNICAL_RESEARCHER_PROMPT,
USER_RESEARCHER_PROMPT,
)

logger = get_logger("p08.research.tools")

# 内存中的研究存储(用于向后兼容)
_in_memory_findings: dict[str, str] = {}
_in_memory_debate = ""


@tool
def research_technical(question: str) -> str:
"""技术视角研究员:从技术可行性、架构、实现难度角度分析问题。

Args:
question: 需要研究的问题

Returns:
技术视角分析报告
"""
logger.info("技术研究员开始分析: %s", question)

prompt = TECHNICAL_RESEARCHER_PROMPT.format(question=question)

try:
model = build_chat_model()
result = model.invoke(prompt)
content = result.content if hasattr(result, "content") else str(result)

_in_memory_findings[ResearchRole.TECHNICAL] = content
logger.info("技术研究完成,长度: %d", len(content))
return content
except Exception as e:
logger.error("技术研究失败: %s", e)
return f"❌ 技术研究失败: {e}"


@tool
def research_business(question: str) -> str:
"""商业视角研究员:从市场、商业模式、成本收益角度分析问题。

Args:
question: 需要研究的问题

Returns:
商业视角分析报告
"""
logger.info("商业研究员开始分析: %s", question)

prompt = BUSINESS_RESEARCHER_PROMPT.format(question=question)

try:
model = build_chat_model()
result = model.invoke(prompt)
content = result.content if hasattr(result, "content") else str(result)

_in_memory_findings[ResearchRole.BUSINESS] = content
logger.info("商业研究完成,长度: %d", len(content))
return content
except Exception as e:
logger.error("商业研究失败: %s", e)
return f"❌ 商业研究失败: {e}"


@tool
def research_user(question: str) -> str:
"""用户视角研究员:从用户体验、需求、痛点角度分析问题。

Args:
question: 需要研究的问题

Returns:
用户视角分析报告
"""
logger.info("用户研究员开始分析: %s", question)

prompt = USER_RESEARCHER_PROMPT.format(question=question)

try:
model = build_chat_model()
result = model.invoke(prompt)
content = result.content if hasattr(result, "content") else str(result)

_in_memory_findings[ResearchRole.USER] = content
logger.info("用户研究完成,长度: %d", len(content))
return content
except Exception as e:
logger.error("用户研究失败: %s", e)
return f"❌ 用户研究失败: {e}"


@tool
def debate_findings() -> str:
"""辩论主持人:对比各方研究结果,找出矛盾点和共识点。

Returns:
辩论结果报告
"""
logger.info("辩论主持人开始工作")

# 收集所有研究发现
findings_list = []
for role, content in _in_memory_findings.items():
role_name = {
ResearchRole.TECHNICAL: "技术视角",
ResearchRole.BUSINESS: "商业视角",
ResearchRole.USER: "用户视角",
}.get(role, str(role))
findings_list.append(f"=== {role_name} ===\n{content}")

if not findings_list:
return "⚠️ 需要先完成至少一项研究才能进行辩论。"

all_findings = "\n\n".join(findings_list)
prompt = DEBATER_PROMPT.format(all_findings=all_findings[:10000])

try:
model = build_chat_model()
result = model.invoke(prompt)
debate = result.content if hasattr(result, "content") else str(result)

global _in_memory_debate
_in_memory_debate = debate

logger.info("辩论完成")
return debate
except Exception as e:
logger.error("辩论失败: %s", e)
return f"❌ 辩论失败: {e}"


@tool
def synthesize_report() -> str:
"""综合报告撰写人:基于所有研究和辩论结果,生成最终研究报告。

Returns:
完整的研究报告(Markdown 格式)
"""
logger.info("报告撰写人开始工作")

# 收集所有研究发现
findings_list = []
for role, content in _in_memory_findings.items():
role_name = {
ResearchRole.TECHNICAL: "技术视角",
ResearchRole.BUSINESS: "商业视角",
ResearchRole.USER: "用户视角",
}.get(role, str(role))
findings_list.append(f"=== {role_name} ===\n{content}")

if not findings_list:
return "⚠️ 需要先完成研究才能生成报告。"

all_findings = "\n\n".join(findings_list)
debate_result = _in_memory_debate or "(辩论尚未进行)"

prompt = SYNTHESIZER_PROMPT.format(
all_findings=all_findings[:8000],
debate_result=debate_result[:2000],
)

try:
model = build_chat_model()
result = model.invoke(prompt)
report = result.content if hasattr(result, "content") else str(result)

logger.info("报告生成完成,长度: %d", len(report))
return report
except Exception as e:
logger.error("报告生成失败: %s", e)
return f"❌ 报告生成失败: {e}"


@tool
def get_all_findings() -> str:
"""获取所有已完成的研究发现摘要。

Returns:
研究发现摘要
"""
if not _in_memory_findings:
return "📋 尚无研究结果。请先调用研究工具进行分析。"

lines = ["📋 已完成的研究发现:", ""]
for role, content in _in_memory_findings.items():
role_name = {
ResearchRole.TECHNICAL: "🔬 技术视角",
ResearchRole.BUSINESS: "💼 商业视角",
ResearchRole.USER: "👤 用户视角",
}.get(role, str(role))
preview = content[:100].replace("\n", " ")
lines.append(f"{role_name}: {preview}...")

if _in_memory_debate:
lines.extend(["", "⚖️ 辩论已完成"])

return "\n".join(lines)


@tool
def clear_research() -> str:
"""清空当前研究数据,开始新的研究。

Returns:
操作结果
"""
_in_memory_findings.clear()
global _in_memory_debate
_in_memory_debate = ""
logger.info("研究数据已清空")
return "✅ 研究数据已清空,可以开始新的研究。"


def get_all_tools() -> list[Any]:
"""获取所有可用工具列表。

Returns:
工具对象列表
"""
return [
research_technical,
research_business,
research_user,
debate_findings,
synthesize_report,
get_all_findings,
clear_research,
]

repositories.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
"""项目八:仓储层。

定义研究数据的持久化存储和检索接口。
"""
from __future__ import annotations

import json
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any

from core.config import DATA_DIR
from core.logging_conf import get_logger

from .models import DebateResult, FinalReport, ResearchFinding, ResearchTask

logger = get_logger("p08.research.repository")

DB_PATH = DATA_DIR / "research_analysis.db"


class ResearchRepository:
"""研究数据仓储。

提供研究任务、发现、辩论结果、报告的持久化存储和检索。
"""

def __init__(self, db_path: Path | None = None) -> None:
self._db_path = db_path or DB_PATH
self._init_db()

def _init_db(self) -> None:
"""初始化数据库表结构。"""
conn = sqlite3.connect(str(self._db_path))
try:
# 任务表
conn.execute("""
CREATE TABLE IF NOT EXISTS research_tasks (
task_id TEXT PRIMARY KEY,
question TEXT NOT NULL,
status TEXT NOT NULL,
started_at TEXT NOT NULL,
completed_at TEXT
)
""")

# 研究发现表
conn.execute("""
CREATE TABLE IF NOT EXISTS research_findings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
role TEXT NOT NULL,
perspective TEXT NOT NULL,
content TEXT NOT NULL,
confidence REAL NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES research_tasks(task_id)
)
""")

# 辩论结果表
conn.execute("""
CREATE TABLE IF NOT EXISTS debate_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
consensus_json TEXT,
contradiction_json TEXT,
blind_spots_json TEXT,
contributions_json TEXT,
assessment TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES research_tasks(task_id)
)
""")

# 最终报告表
conn.execute("""
CREATE TABLE IF NOT EXISTS final_reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
title TEXT NOT NULL,
executive_summary TEXT,
background TEXT,
multi_perspective TEXT,
consensus_debate TEXT,
conclusions_json TEXT,
recommendations_json TEXT,
limitations_json TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (task_id) REFERENCES research_tasks(task_id)
)
""")

conn.commit()
logger.info("数据库初始化完成: %s", self._db_path)
finally:
conn.close()

def save_task(self, task: ResearchTask) -> None:
"""保存研究任务。

Args:
task: 研究任务对象
"""
conn = sqlite3.connect(str(self._db_path))
try:
# 保存任务基本信息
conn.execute(
"""
INSERT OR REPLACE INTO research_tasks
(task_id, question, status, started_at, completed_at)
VALUES (?, ?, ?, ?, ?)
""",
(
task.task_id,
task.question,
task.status.value,
task.started_at,
task.completed_at or None,
),
)

# 删除旧的发现,然后保存新的
conn.execute("DELETE FROM research_findings WHERE task_id = ?", (task.task_id,))
for finding in task.findings:
conn.execute(
"""
INSERT INTO research_findings
(task_id, role, perspective, content, confidence, created_at)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
task.task_id,
finding.role.value,
finding.perspective,
finding.content,
finding.confidence,
finding.created_at,
),
)

# 保存辩论结果
if task.debate_result:
conn.execute("DELETE FROM debate_results WHERE task_id = ?", (task.task_id,))
debate = task.debate_result
conn.execute(
"""
INSERT INTO debate_results
(task_id, consensus_json, contradiction_json, blind_spots_json,
contributions_json, assessment, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
task.task_id,
json.dumps(debate.consensus_points, ensure_ascii=False),
json.dumps(debate.contradiction_points, ensure_ascii=False),
json.dumps(debate.blind_spots, ensure_ascii=False),
json.dumps(debate.unique_contributions, ensure_ascii=False),
debate.overall_assessment,
datetime.now().isoformat(),
),
)

# 保存最终报告
if task.final_report:
conn.execute("DELETE FROM final_reports WHERE task_id = ?", (task.task_id,))
report = task.final_report
conn.execute(
"""
INSERT INTO final_reports
(task_id, title, executive_summary, background,
multi_perspective, consensus_debate,
conclusions_json, recommendations_json, limitations_json, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
task.task_id,
report.title,
report.executive_summary,
report.background,
report.multi_perspective_analysis,
report.consensus_and_debate,
json.dumps(report.conclusions, ensure_ascii=False),
json.dumps(report.recommendations, ensure_ascii=False),
json.dumps(report.limitations, ensure_ascii=False),
datetime.now().isoformat(),
),
)

conn.commit()
logger.info("任务已保存: %s", task.task_id)
finally:
conn.close()

def get_task(self, task_id: str) -> ResearchTask | None:
"""获取研究任务。

Args:
task_id: 任务 ID

Returns:
任务对象,不存在返回 None
"""
from .models import ResearchRole, ResearchStatus

conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
try:
task_row = conn.execute(
"SELECT * FROM research_tasks WHERE task_id = ?",
(task_id,),
).fetchone()

if not task_row:
return None

task = ResearchTask(
task_id=task_row["task_id"],
question=task_row["question"],
status=ResearchStatus(task_row["status"]),
started_at=task_row["started_at"],
completed_at=task_row["completed_at"] or "",
)

# 加载研究发现
findings = conn.execute(
"SELECT * FROM research_findings WHERE task_id = ?",
(task_id,),
).fetchall()

for row in findings:
task.findings.append(ResearchFinding(
role=ResearchRole(row["role"]),
perspective=row["perspective"],
content=row["content"],
confidence=row["confidence"],
created_at=row["created_at"],
))

# 加载辩论结果
debate_row = conn.execute(
"SELECT * FROM debate_results WHERE task_id = ?",
(task_id,),
).fetchone()

if debate_row:
task.debate_result = DebateResult(
consensus_points=json.loads(debate_row["consensus_json"] or "[]"),
contradiction_points=json.loads(debate_row["contradiction_json"] or "[]"),
blind_spots=json.loads(debate_row["blind_spots_json"] or "[]"),
unique_contributions=json.loads(debate_row["contributions_json"] or "{}"),
overall_assessment=debate_row["assessment"] or "",
)

# 加载最终报告
report_row = conn.execute(
"SELECT * FROM final_reports WHERE task_id = ?",
(task_id,),
).fetchone()

if report_row:
task.final_report = FinalReport(
title=report_row["title"],
executive_summary=report_row["executive_summary"] or "",
background=report_row["background"] or "",
multi_perspective_analysis=report_row["multi_perspective"] or "",
consensus_and_debate=report_row["consensus_debate"] or "",
conclusions=json.loads(report_row["conclusions_json"] or "[]"),
recommendations=json.loads(report_row["recommendations_json"] or "[]"),
limitations=json.loads(report_row["limitations_json"] or "[]"),
)

return task
finally:
conn.close()

def list_tasks(self, limit: int = 20) -> list[dict[str, Any]]:
"""列出最近的研究任务。

Args:
limit: 返回数量限制

Returns:
任务摘要列表
"""
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT task_id, question, status, started_at, completed_at
FROM research_tasks
ORDER BY started_at DESC
LIMIT ?
""",
(limit,),
).fetchall()

return [dict(row) for row in rows]
finally:
conn.close()

def search_tasks(self, keyword: str) -> list[dict[str, Any]]:
"""搜索研究任务。

Args:
keyword: 搜索关键词

Returns:
匹配的任务摘要列表
"""
conn = sqlite3.connect(str(self._db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(
"""
SELECT task_id, question, status, started_at, completed_at
FROM research_tasks
WHERE question LIKE ?
ORDER BY started_at DESC
LIMIT 20
""",
(f"%{keyword}%",),
).fetchall()

return [dict(row) for row in rows]
finally:
conn.close()


# 全局仓储实例
_repository: ResearchRepository | None = None


def get_repository() -> ResearchRepository:
"""获取研究仓储实例。

Returns:
ResearchRepository 单例
"""
global _repository
if _repository is None:
_repository = ResearchRepository()
return _repository


class ResearchStore:
"""内存研究存储(向后兼容)。

保留以维持旧导入兼容:
from projects.p08_research_analysis import ResearchStore

新代码请使用 ResearchRepository(SQLite 持久化)。
本类提供原单文件版本的内存存储接口,便于平滑迁移。
"""

def __init__(self) -> None:
self._findings: dict[str, str] = {}
self._debate: str = ""
self._final: str = ""

def add_finding(self, researcher: str, content: str) -> None:
"""添加研究发现。"""
self._findings[researcher] = content

def all_findings(self) -> str:
"""获取所有研究发现合并文本。"""
if not self._findings:
return "尚无研究结果。"
parts = []
for name, content in self._findings.items():
parts.append(f"=== {name} ===\n{content[:2000]}")
return "\n\n".join(parts)

service.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
"""项目八:服务层。

封装多智能体研究分析平台的核心业务逻辑。
"""
from __future__ import annotations

from typing import Any

from langchain.agents import create_agent

from core import build_chat_model
from core.logging_conf import get_logger

from .models import ResearchRole, ResearchStatus, ResearchTask
from .prompts import LEAD_RESEARCHER_PROMPT
from .repositories import get_repository
from .tools import get_all_tools

logger = get_logger("p08.research.service")


class ResearchAnalysisService:
"""研究分析服务类。

封装多智能体研究分析的核心业务逻辑:
- 创建和管理研究任务
- 协调各研究员并行工作
- 管理辩论和报告生成流程
- 持久化存储研究结果
"""

def __init__(self) -> None:
self._agent: Any | None = None
self._repository = get_repository()

def build_agent(self) -> Any:
"""构建首席研究员 Agent。

Returns:
LangChain Agent 对象
"""
if self._agent is not None:
return self._agent

self._agent = create_agent(
model=build_chat_model(),
tools=get_all_tools(),
system_prompt=LEAD_RESEARCHER_PROMPT,
)
return self._agent

def create_task(self, question: str, task_id: str | None = None) -> ResearchTask:
"""创建研究任务。

Args:
question: 研究问题
task_id: 可选任务 ID

Returns:
任务对象
"""
import uuid

if task_id is None:
task_id = f"research_{uuid.uuid4().hex[:8]}"

task = ResearchTask(
task_id=task_id,
question=question,
status=ResearchStatus.PENDING,
)

self._repository.save_task(task)
logger.info("创建研究任务: %s - %s", task_id, question)
return task

def get_task(self, task_id: str) -> ResearchTask | None:
"""获取研究任务。

Args:
task_id: 任务 ID

Returns:
任务对象或 None
"""
return self._repository.get_task(task_id)

async def run_parallel_research(self, task: ResearchTask) -> bool:
"""并行运行三个研究员的研究。

Args:
task: 研究任务

Returns:
是否成功完成至少一项研究
"""
task.status = ResearchStatus.RESEARCHING
self._repository.save_task(task)

logger.info("[%s] 开始并行研究", task.task_id)

# 导入研究工具
from .tools import research_business, research_technical, research_user

# 执行研究(注:实际生产环境可用 asyncio.gather 真正并行)
# 这里为了演示清晰,顺序执行,生产环境可改为真正的并行
success_count = 0

try:
# 技术研究
logger.info(" 技术研究员工作中...")
tech_result = research_technical.invoke({"question": task.question})
if "失败" not in tech_result:
task.add_finding(ResearchRole.TECHNICAL, "技术视角", tech_result)
success_count += 1
except Exception as e:
logger.error("技术研究异常: %s", e)

try:
# 商业研究
logger.info(" 商业研究员工作中...")
biz_result = research_business.invoke({"question": task.question})
if "失败" not in biz_result:
task.add_finding(ResearchRole.BUSINESS, "商业视角", biz_result)
success_count += 1
except Exception as e:
logger.error("商业研究异常: %s", e)

try:
# 用户研究
logger.info(" 用户研究员工作中...")
user_result = research_user.invoke({"question": task.question})
if "失败" not in user_result:
task.add_finding(ResearchRole.USER, "用户视角", user_result)
success_count += 1
except Exception as e:
logger.error("用户研究异常: %s", e)

self._repository.save_task(task)
logger.info("并行研究完成,成功 %d/3 个视角", success_count)
return success_count > 0

async def run_debate(self, task: ResearchTask) -> bool:
"""运行辩论阶段。

Args:
task: 研究任务

Returns:
是否成功
"""
task.status = ResearchStatus.DEBATING
self._repository.save_task(task)

logger.info("[%s] 开始辩论阶段", task.task_id)

from .tools import debate_findings

try:
debate_result = debate_findings.invoke({})
if "尚无" in debate_result or "失败" in debate_result:
logger.warning("辩论未完成: %s", debate_result)
return False

# 解析辩论结果并保存
from .models import DebateResult

task.debate_result = DebateResult(
overall_assessment=debate_result,
)
self._repository.save_task(task)

logger.info("辩论完成")
return True
except Exception as e:
logger.error("辩论异常: %s", e)
return False

async def run_synthesis(self, task: ResearchTask) -> str | None:
"""运行报告综合阶段。

Args:
task: 研究任务

Returns:
报告内容(Markdown),失败返回 None
"""
task.status = ResearchStatus.SYNTHESIZING
self._repository.save_task(task)

logger.info("[%s] 开始报告综合阶段", task.task_id)

from .tools import synthesize_report

try:
report_content = synthesize_report.invoke({})
if "需要先完成" in report_content or "失败" in report_content:
logger.warning("报告生成未完成: %s", report_content)
return None

# 构造报告对象并持久化(用于历史追溯)
from .models import FinalReport

task.final_report = FinalReport(
title=f"研究报告:{task.question}",
executive_summary="",
background="",
multi_perspective_analysis=task.all_findings_text(),
consensus_and_debate=task.debate_result.overall_assessment if task.debate_result else "",
conclusions=[],
recommendations=[],
limitations=["基于 AI 分析,仅供参考"],
)
task.mark_completed()
self._repository.save_task(task)

logger.info("报告综合完成")
return report_content
except Exception as e:
logger.error("报告综合异常: %s", e)
return None

async def run_full_research(self, question: str, task_id: str | None = None) -> str:
"""运行完整的研究流程。

Args:
question: 研究问题
task_id: 可选任务 ID

Returns:
最终研究报告
"""
task = self.create_task(question, task_id)

# 清空之前的内存存储
from .tools import clear_research
clear_research.invoke({})

# 阶段 1: 并行研究
success = await self.run_parallel_research(task)
if not success:
return "❌ 研究阶段失败,请重试"

# 阶段 2: 辩论
await self.run_debate(task)

# 阶段 3: 综合报告
report = await self.run_synthesis(task)
if report:
return report

return "⚠️ 研究已完成,但报告生成失败。"

def list_recent_tasks(self, limit: int = 20) -> list[dict[str, Any]]:
"""列出最近的研究任务。

Args:
limit: 返回数量限制

Returns:
任务摘要列表
"""
return self._repository.list_tasks(limit)

def search_tasks(self, keyword: str) -> list[dict[str, Any]]:
"""搜索历史研究任务。

Args:
keyword: 搜索关键词

Returns:
匹配的任务摘要列表
"""
return self._repository.search_tasks(keyword)


# 全局服务实例
_service: ResearchAnalysisService | None = None


def get_service() -> ResearchAnalysisService:
"""获取研究分析服务实例。

Returns:
ResearchAnalysisService 单例
"""
global _service
if _service is None:
_service = ResearchAnalysisService()
return _service

project.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""项目八:项目定义层。

定义多智能体研究分析平台的项目注册和对外接口。
"""
from __future__ import annotations

from typing import Any

from core import BaseProject, registry

from .service import get_service
from .tools import get_all_tools


class ResearchAnalysisProject(BaseProject):
"""多智能体研究分析平台。

主讲能力:Multi-Agent 深度协作——并行研究 + 辩论 + 综合

业务场景:用户提出一个研究问题,多个研究员 Agent 从不同角度独立研究,
然后辩论 Agent 对比各方观点找出矛盾,最后由综合 Agent 生成共识报告。

生产级特性:
- 并行研究员:3 个 Agent 同时从不同角度研究(技术/商业/用户视角)
- 辩论机制:对比各方观点,找出矛盾与共识
- 综合报告:整合所有观点生成结构化研究报告
- 研究过程可追溯(每个研究员的原始输出保留)
- 数据持久化:所有研究结果存入数据库,支持历史查询
"""

id = "p08_research_analysis"
name = "多智能体研究分析平台"
description = "3 研究员并行研究 + 辩论 + 综合报告,多视角深度协作。"
capabilities = ["Multi-Agent", "Research", "Debate", "Synthesis"]

def build_agent(self) -> Any:
"""构建首席研究员 Agent 实例。

Returns:
LangChain Agent 对象
"""
service = get_service()
return service.build_agent()

def run(self, message: str) -> str:
"""运行研究任务。

Args:
message: 用户输入的研究问题

Returns:
研究报告
"""
import asyncio

service = get_service()

try:
# 尝试运行完整工作流
loop = asyncio.get_event_loop_policy().get_event_loop()
if loop.is_running():
# 如果已有事件循环(如在服务中),使用 Agent 直接运行
agent = self.build_agent()
return agent.run(message)
else:
# 否则异步运行完整工作流
return asyncio.run(service.run_full_research(message))
except Exception as e:
# 降级方案:直接用 Agent 运行
try:
agent = self.build_agent()
return agent.run(message)
except Exception as e2:
return f"❌ 研究执行失败: {e2}"


# 项目实例
project = ResearchAnalysisProject()

# 对外暴露的快捷函数
run = project.run
build_agent = project.build_agent
get_tools = get_all_tools

__all__ = [
"ResearchAnalysisProject",
"project",
"run",
"build_agent",
"get_tools",
]

init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""项目八:多智能体研究分析平台(Multi-Agent 深度协作)

主讲能力:Multi-Agent 深度协作——并行研究 + 辩论 + 综合

业务场景:用户提出一个研究问题,多个研究员 Agent 从不同角度独立研究,
然后辩论 Agent 对比各方观点找出矛盾,最后由综合 Agent 生成共识报告。

生产级特性:
- 并行研究员:3 个 Agent 同时从不同角度研究(技术/商业/用户视角)
- 辩论机制:对比各方观点,找出矛盾与共识
- 综合报告:整合所有观点生成结构化研究报告
- 研究过程可追溯(每个研究员的原始输出保留)
- 数据持久化:所有研究结果存入数据库,支持历史查询

架构分层:
- models.py: 数据模型、研究对象
- prompts.py: 各研究员系统提示词
- tools.py: 研究工具函数
- repositories.py: 数据持久化存储
- service.py: 业务逻辑、工作流编排
- project.py: 项目定义、对外接口
"""
from __future__ import annotations

from core import registry

# 导出 models 层
from .models import (
DebateResult,
FinalReport,
ResearchFinding,
ResearchRole,
ResearchStatus,
ResearchTask,
)

# 导出 prompts 层
from .prompts import (
BUSINESS_RESEARCHER_PROMPT,
DEBATER_PROMPT,
LEAD_RESEARCHER_PROMPT,
SYNTHESIZER_PROMPT,
TECHNICAL_RESEARCHER_PROMPT,
USER_RESEARCHER_PROMPT,
)

# 导出 tools 层(保持向后兼容)
from .tools import (
clear_research,
debate_findings,
get_all_findings,
get_all_tools,
research_business,
research_technical,
research_user,
synthesize_report,
)

# 导出 repositories 层
from .repositories import ResearchRepository, ResearchStore, get_repository

# 导出 service 层
from .service import ResearchAnalysisService, get_service

# 导出 project 层
from .project import (
ResearchAnalysisProject,
build_agent,
get_tools,
project,
run,
)

# 向后兼容别名:原单文件版本使用 SYSTEM_PROMPT
SYSTEM_PROMPT = LEAD_RESEARCHER_PROMPT

# 注册项目到全局注册表
registry.register(project)

__all__ = [
# models
"ResearchRole",
"ResearchStatus",
"ResearchFinding",
"DebateResult",
"FinalReport",
"ResearchTask",
# prompts
"LEAD_RESEARCHER_PROMPT",
"SYSTEM_PROMPT",
"TECHNICAL_RESEARCHER_PROMPT",
"BUSINESS_RESEARCHER_PROMPT",
"USER_RESEARCHER_PROMPT",
"DEBATER_PROMPT",
"SYNTHESIZER_PROMPT",
# tools
"research_technical",
"research_business",
"research_user",
"debate_findings",
"synthesize_report",
"get_all_findings",
"clear_research",
"get_all_tools",
# repositories
"ResearchRepository",
"ResearchStore",
"get_repository",
# service
"ResearchAnalysisService",
"get_service",
# project
"ResearchAnalysisProject",
"project",
"run",
"build_agent",
"get_tools",
]

10.4.2 核心代码讲解

① 三个研究员,同一道题,各自作答。 tools.py 里站着三位研究员:research_technicalresearch_businessresearch_user。它们长得很像——都是“拿 Prompt → 调模型 → 存结果”——但灵魂不同:技术研究员盯着可行性和架构,商业研究员盯着市场和成本,用户研究员盯着需求和痛点。三套 Prompt 互不见面,谁也不影响谁。

service.pyrun_parallel_research 把这三个工具串起来,把各自的产出以 ResearchFinding 的形式追加到同一个 ResearchTask 里。于是在同一道题上,我们攒下了三份独立的判断——这正是后面辩论的弹药。

💡 顿悟时刻:为什么要“独立”? 想象商业研究员在动笔前偷看了技术研究员的稿子。他大概率会顺着技术的结论往下写——要么附和(技术都说能做,那市场肯定有戏),要么抬杠(技术太乐观,我得泼冷水)。无论哪种,他都不再是“商业视角”,而成了“技术视角的回声”。独立性是辩论有意义的前提,没有独立,所谓多视角就是一个人换三顶帽子。

⚠️ 避坑:这里的“并行”目前是名义上的。 翻开 run_parallel_research 你会发现,三个工具其实是顺序执行的,代码注释也写明了“为了演示清晰,顺序执行”。真要并行,得用 asyncio.gather 把三个工具并发跑起来——好在分层架构把这层改动关在了服务层一处,工具层一行都不用动。文档里写的“并行研究”是目标态,当前代码是顺序近似,这一点别被名字骗了。

⚠️ 避坑:工具间靠模块级全局变量传话。 三个研究员把结果写进 _in_memory_findings 这个模块级字典,辩论和综合再从这里读。这意味着同时跑两个研究任务会互相踩——A 任务的研究结果会被 B 任务覆盖。单用户演示没问题,多租户生产环境必须把这块状态挪进任务上下文或数据库。

② 辩论:不是再写一份观点,而是给观点照 X 光。 研究做完了,debate_findings 上场。它把三方发现拼成一份统一上下文,交给 DEBATER_PROMPT 驱动的辩论主持人。

注意辩论主持人的活儿和别人不一样——他不产生新观点,他做“元分析”:哪些是三方都点头的共识?哪些是互相打架的分歧?有哪些重要问题是三方都没碰的盲区?每一方又各自带来了什么独特贡献?这套结构沉淀成 DebateResultto_markdown 能把共识、分歧、盲区清清楚楚地摊开。

💡 顿悟时刻:为什么要先辩论,再综合? 为什么不直接把三份报告扔给综合 Agent 让它合并?因为大模型天生是“和事佬”。你把三份报告一锅端给它,它十有八九会端出一碗粥:“技术可行,市场存在挑战,用户需求中等,建议谨慎推进。” 听着面面俱到,实则把真正的矛盾给抹平了。

辩论这一步,就是逼着系统先把矛盾叫出名来——“技术说三个月能出 demo,商业说预算撑不过两个月,这就是分歧”——矛盾被命名、被看见,才有可能被真正解决。综合 Agent 拿到的不是三份各说各话的报告,而是一份已经把分歧标注好的地图,写出来的结论自然有张力、有取舍。

⚠️ 避坑:当前代码的辩论结果是“整坨”存的。 严格说,DebateResult 定义了 consensus_pointscontradiction_pointsblind_spots 这些结构化字段,但 run_debate 只把模型返回的整段文本塞进了 overall_assessment 一个字段,并没有解析成结构化列表。也就是说,模型其实照着结构化框架输出了,但代码没接住to_markdown 的优雅排版在服务流程里基本没派上用场。这是后续可以补强的地方——加个解析器,把模型输出映回结构化字段。

③ 仓储层:让每一次研究都留痕。 repositories.pyResearchRepository 用 SQLite 落库,四张表分工明确:任务表存元信息,研究发现表存三方原始产出,辩论结果表存辩论,最终报告表存结论。

写入策略是“先删后插”——save_task 每次先清掉该任务的旧数据再写新的,保证同一个任务可以反复更新而不留脏数据。列表和字典类字段(共识点、结论、建议)用 json.dumps(ensure_ascii=False) 序列化成文本存,读回时 json.loads 还原。list_tasks 按时间倒序浏览,search_tasks 按关键词检索——每一次研究都可追溯、可复查,这一点在严肃决策场景里比报告本身还重要。

④ 服务层:把流程编排成一部三幕剧。 ResearchAnalysisService 把完整研究拆成三幕:run_parallel_research(研究)→ run_debate(辩论)→ run_synthesis(综合),由 run_full_research 统一指挥。

每一幕开场前,先把 ResearchStatus 推到对应状态(RESEARCHING → DEBATING → SYNTHESIZING)并落库;幕落了再落一次库。于是数据库里留下了一条清晰的状态机轨迹,出了问题一眼能看出卡在哪一幕。build_agentcreate_agent 装配首席研究员 Agent,工具来自 get_all_tools,系统提示词是 LEAD_RESEARCHER_PROMPT,懒加载、带缓存。

⑤ 错误降级:每一层都给自己留后路。 这套系统在容错上挺“怂”——但怂得有道理。研究员工具用 try/except 兜住模型调用异常,失败了不抛,而是返回一串带 ❌ 的提示;run_parallel_research 给每个视角单独 try/except,只要有一项成功就继续往下走,靠 "失败" not in result 判断产出是否有效;run_debate 看到结果里带“尚无”“失败”就返回 False,run_synthesis 看到“需要先完成”“失败”就返回 None,谁都不让流程硬中断。

project.pyrun 更是做了双层兜底:先探一探当前有没有在跑的事件循环,据此决定用 asyncio.run 还是 agent.run;外层再套一个 try,万一全崩了,退化为直接调 Agent。宗旨就一条:任何环节出问题,都给用户一个能读的回复,而不是一个 traceback。

⑥ 分层的好处:换零件不拆房子。 模型层只管定义数据,提示词层只管描述角色,工具层只管单步执行,仓储层只管存取,服务层只管编排,项目层只管对外。六层各司其职,带来三样东西:

一是可测试——每层都能单独 mock,仓储层注入个内存库、工具层注入个假模型,单测就能跑;二是可替换——研究员现在直接调模型,将来要换 Web Search、论文库或 RAG,只动工具层,服务层不眨眼;三是可演进——前面说的“顺序升真并行”,也只改 run_parallel_research 一处。__init__.py 把各层符号统一 re-export,再顺手 registry.register,外部 import 一下包就能用。

金句:分层的本质,是给未来的改动画好“施工范围”。 改哪一层,灰尘就只落在哪一层。


10.5 跑一跑:它真的行吗

测试要盯的不是“报告好不好看”,而是“流水线每一段是否站得住”。重点这么几条:

  • ResearchStore 能不能正确保存和聚合多视角结果(这是最底层的数据合同)。
  • 三个研究员工具各自是否返回非空文本——空文本意味着某一视角“失声”,后面辩论就少了弹药。
  • 综合工具产出的报告里,是否真的包含决策建议风险清单,而不是一团和气的废话。
  • 集成测试跑一遍完整流程(研究 → 辩论 → 综合),验证三幕剧能从头演到尾。

10.6 送上线:让它上班

前端输入研究问题后,执行过程面板会依次显示三个研究员的产出,再显示辩论和最终报告——用户看到的不是一锤子结论,而是结论是怎么长出来的。生产环境务必开启日志落库,让每个结论都能追溯到“它来自哪个视角、经过了怎样的辩论”。审计能不能查、敢不敢查,是这套系统从“演示”走向“决策辅助”的分水岭。


10.7 回头看:学到了什么

回到那场吵不出结果的产品会。技术、商业、用户三方僵持不下,其实不是问题,而是信号——它说明这件事值得从三个角度都看一遍。

本章给了这个信号一个去处:让三个研究员 Agent 各自独立研究,让辩论 Agent 把矛盾叫出名来,让综合 Agent 在已知分歧的前提下给出判断。这就是 Multi-Agent 的第二种价值——不是流水线协作,而是多视角对抗

金句:好的决策不是给一个答案,而是让不同视角充分表达,再形成可解释的综合判断。

第 9 章的 Agent 像接力赛,棒交下去,彼此信任;本章的 Agent 像辩论赛,谁也不轻信谁,非得把矛盾摆上台面。前者解决“怎么把一件事做顺”,后者解决“怎么把一件事想透”。

当然,本章代码离生产级还有几段路要走:研究是顺序而非真并行、工具间靠全局变量传话、辩论结果没解析成结构化、研究员还没接真实数据源。但这些“留白”恰恰是分层架构留下的礼物——每一处短板,都已经被关在它该在的那一层里,等你去填。


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !