AI项目实战(10)多智能体内容创作平台

《AI Agent 实战》系列 · 多智能体内容创作平台

Posted by Ryan on 2026-07-02
Estimated Reading Time 36 Minutes
Words 8.7k In Total
Viewed Times

主讲能力:Multi-Agent 协作、共享状态、并行节点
业务场景:用户输入一个主题,系统自动组织一支"内容团队"完成策划、写作、审稿、配图建议和最终汇总。
对应代码backend/projects/p07_content_creation/


9.1 编辑的困境

周六凌晨一点,公众号编辑小李还对着屏幕发呆。一篇《AI Agent 工程化实践》的文章,他从下午写到深夜:先憋大纲,大纲改了三版才动笔写正文;正文写到一半又觉得结构不对,回头改大纲;好不容易写完,自己审一遍发现前后文风不统一;最后还得自己找配图、写图注。

最让他崩溃的不是累,而是“身份切换”——上一秒还在以策划的口吻想“读者到底想看什么”,下一秒就得用审校的眼光挑“这段逻辑通不通”,再下一秒又要切成设计师琢磨“这里配张什么图”。一个人演完整个编辑部,每个角色都演不到位。

小李的处境,正是单 Agent 做内容创作的缩影。把“想清楚写什么、写出来、挑毛病、配图”全塞进一个 Agent 的上下文,它和小李一样分身乏术:一会儿顾结构、一会儿顾文笔、一会儿顾配图,哪样都不精,输出自然散乱、忽高忽低。

后来小李的公司补了个小团队:策划、撰稿、审校、设计各一人。每人只干自己那摊事,彼此接力——策划定大纲交给撰稿,撰稿写完交审校,审校改完配设计给的图,主编最后拼成稿。质量上去了,谁也不必精分。

本项目要做的,就是给 AI 也组一支这样的小编辑部:让多个 Agent 各司其职,而不是逼一个 Agent 全能。对应到角色分工:

角色 职责
策划 Agent 明确主题、受众、文章结构
写作 Agent 根据大纲撰写正文
审稿 Agent 统一风格、检查逻辑、提出修改建议
配图 Agent 生成配图建议和 AI 绘图提示词
汇总节点 输出完整 Markdown 文稿

功能需求

  • 用户输入主题后自动生成完整文章。
  • 各 Agent 角色职责清晰,不互相越界。
  • 写作和配图只依赖大纲、彼此无依赖,可并行以提高效率(实现见 9.4.2)。
  • 最终输出 Markdown,可直接发布到博客或 CMS。

9.2 画个样子:它该长啥样

整个编辑部怎么接力?看这张流程图就一目了然——策划打头阵定大纲,写作和配图都从大纲分叉出去,审校等正文写完再上场,最后汇总节点把所有人的活儿拼成一篇稿:

%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
graph TD
    Topic["用户主题"] --> Planner["策划 Agent"]
    Planner --> Writer["写作 Agent"]
    Planner --> Illustrator["配图 Agent"]
    Writer --> Reviewer["审稿 Agent"]
    Reviewer --> Assemble["汇总节点"]
    Illustrator --> Assemble
    Assemble --> Final["最终 Markdown 文稿"]

    classDef role fill:#e0f7fa,stroke:#00acc1,stroke-width:2px,color:#006064
    classDef out fill:#e8f5e9,stroke:#43a047,stroke-width:2px,color:#1b5e20
    class Planner,Writer,Reviewer,Illustrator,Assemble role
    class Final out

💡 实现说明:图里画的"写作与配图并行"不是空想——graph.pyasyncio.gather 把写作和配图同时发起,两者都只依赖大纲、各写各的共享记忆键(section:*images),互不干扰。审校依赖正文,所以排在并行环节之后。细节在 9.4.2 展开。

用户只需输入:

写一篇关于 AI Agent 工程化实践的文章。

系统输出包括:大纲、正文、审稿意见、配图建议。


9.3 拆开看:怎么造出来

先说清楚一个事实,免得你翻代码时犯迷糊:本项目并没有用 LangGraph 的 StateGraph,共享状态也不是一个 State 类型,而是一个自定义的 SharedMemory 类(见 models.py)。工作流由 ContentCreationGraph 这个类手动编排,主编 Agent 用 LangChain 的 create_agent 构建。早期文档里“StateGraph / State”的叫法,更像是设计阶段的设想,落地时换成了更轻的共享记忆池。下面按实际实现来讲。

为什么不做一个“全能 Agent”?

刚接触 Multi-Agent 时,很多人都会冒出一个疑问:现在的大模型这么强,一个 Agent 既能规划、又能写作、又能审校,干嘛非得拆开?拆开不就多一层调度、多一堆通信吗?

这疑问合理,但真上手就明白了。我最早试的就是“全能 Agent”——一个 Prompt 塞进“先想大纲、再写正文、再审稿、最后配图”全套指令。结果很尴尬:模型一会儿忘了写到第几章,一会儿把审稿意见混进正文,配图提示词干脆漏了。上下文越长、角色越杂,它越容易“走神”,因为得在同一份注意力里不停切换人格。

💡 顿悟时刻:人脑也一样。让一个人同时当策划、撰稿、审校、设计,他也会乱。高效编辑部靠的不是“一个全能编辑”,而是“每个岗位只盯一件事”。Agent 同理——角色隔离,本质是给模型减负。一个 Agent 一个身份、一份聚焦的上下文,输出反而稳。

所以 Multi-Agent 的价值不在“显得高级”,而在用角色隔离降低复杂度。策划 Agent 的上下文里只有大纲,写作 Agent 的上下文里只有正文,互不干扰,各自专精。

Agent 之间怎么交接?——共享记忆池

各司其职不等于各干各的。策划写好的大纲,撰稿得看到;撰稿写完的正文,审校得读到。最直觉的做法是 Agent 之间互相传参,但耦合太紧,加一个角色就得改一串调用链。

本项目的做法是引入共享记忆池(SharedMemory):每个 Agent 干完活,把成果往池子里一扔(put);下一个 Agent 上班时,从池子里取自己要的(get)。Agent 之间不见面,只跟记忆池打交道。这就像编辑部的共享文档:策划把大纲放进去,撰稿打开看,写完正文也放进去,审校再打开读。谁也不必认识谁,文档是唯一的交接物。

⚠️ 避坑:共享记忆池好用,但别什么都往里塞。本项目只放各角色的关键产出(大纲、章节正文、审稿意见、配图建议),不放中间的琐碎思考。池子越臃肿,后续 Agent 被无关信息干扰的风险越大。

并行节点为什么可行(设计层面)

看流程图会发现:写作和配图都只依赖大纲,彼此无依赖,理论上能同时开工。这点 9.2 已经提过,这里再强调一次设计意图,方便理解图。

核心思想记牢:每个 Agent 只改自己负责的状态字段,多角色不互相污染;写作和配图只读大纲、各写各的字段,天然具备并行的基础。代码里也确实用 asyncio.gather 把这两步同时发起,9.4.2 讲 run_full_workflow 时细说。


9.4 动手写:三层架构完整代码

接下来看代码。这套多智能体内容创作平台按分层架构组织——从数据模型、提示词、工具,到工作流编排与项目注册,一层一层叠起来。各层职责清晰、单向依赖,和编辑部“各司其职”是同一个思路:谁的地盘谁做主,互不越位。

9.4.1 三层架构完整代码

这套“各司其职”落到代码上,就是六层分明:模型层、提示词层、工具层、编排层、项目层、入口。各层对应文件与职责如下:

层次 文件 职责
模型层 models.py Agent 角色、内容模型、共享记忆池
提示词层 prompts.py 各角色系统提示词(策划/写作/审校/配图)
工具层 tools.py 各 Agent 工具函数
编排层 graph.py Multi-Agent 工作流编排
项目层 project.py 项目注册、对外接口
入口 init.py 注册与 re-export

下面依次给出每个文件的完整代码,原汁原味,不省一行。

models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
"""项目七:数据模型层。

定义多智能体内容创作平台的领域模型和数据结构。
"""
from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum
from typing import Any


class AgentRole(str, Enum):
"""Agent 角色枚举。"""
PLANNER = "planner" # 策划师
WRITER = "writer" # 撰稿人
REVIEWER = "reviewer" # 审稿人
DESIGNER = "designer" # 配图设计师
EDITOR = "editor" # 主编(总控)


class TaskStatus(str, Enum):
"""任务状态枚举。"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"


@dataclass
class ContentOutline:
"""文章大纲模型。"""
topic: str
title: str = ""
introduction: str = ""
sections: list[str] = field(default_factory=list)
conclusion: str = ""

def to_markdown(self) -> str:
"""转换为 Markdown 格式。"""
lines = [f"# {self.title or self.topic}", "", "## 导语", self.introduction]
for i, section in enumerate(self.sections, 1):
lines.extend(["", f"## 第 {i} 章:{section}"])
if self.conclusion:
lines.extend(["", "## 结语", self.conclusion])
return "\n".join(lines)


@dataclass
class ArticleSection:
"""文章章节模型。"""
title: str
content: str
order: int = 0
word_count: int = 0

def __post_init__(self) -> None:
"""计算字数。"""
self.word_count = len(self.content)


@dataclass
class ImageSuggestion:
"""配图建议模型。"""
section_title: str
image_type: str # 示意图、数据图、场景图、概念图
prompt: str # AI 生成提示词(英文)
description: str = "" # 中文描述


@dataclass
class ReviewFeedback:
"""审稿反馈模型。"""
overall_score: int # 1-10 分
strengths: list[str] = field(default_factory=list)
issues: list[str] = field(default_factory=list)
suggestions: list[str] = field(default_factory=list)


@dataclass
class ContentTask:
"""内容创作任务模型。"""
task_id: str
topic: str
status: TaskStatus = TaskStatus.PENDING
outline: ContentOutline | None = None
sections: list[ArticleSection] = field(default_factory=list)
images: list[ImageSuggestion] = field(default_factory=list)
review: ReviewFeedback | None = None
final_article: str = ""

def add_section(self, title: str, content: str, order: int = 0) -> None:
"""添加章节。"""
self.sections.append(ArticleSection(
title=title,
content=content,
order=order,
))

def get_total_words(self) -> int:
"""获取总字数。"""
return sum(s.word_count for s in self.sections)


class SharedMemory:
"""多 Agent 共享记忆池。

所有 Agent 的输出都存入这里,后续 Agent 可读取。
"""

def __init__(self) -> None:
self._store: dict[str, Any] = {}
self._order: list[str] = []

def put(self, key: str, content: Any) -> None:
"""存入记忆。

Args:
key: 记忆键
content: 记忆内容
"""
self._store[key] = content
if key not in self._order:
self._order.append(key)

def get(self, key: str) -> Any:
"""读取记忆。

Args:
key: 记忆键

Returns:
记忆内容,不存在返回空字符串
"""
return self._store.get(key, "")

def keys(self) -> list[str]:
"""获取所有记忆键。"""
return list(self._order)

def summary(self) -> str:
"""生成记忆池摘要。

Returns:
摘要文本
"""
lines = ["📦 共享记忆池内容:", ""]
for k in self._order:
content = self._store[k]
preview = str(content)[:80].replace("\n", " ")
lines.append(f" [{k}] {preview}...")
return "\n".join(lines)

def clear(self) -> None:
"""清空记忆池。"""
self._store.clear()
self._order.clear()


# 全局共享记忆实例
shared_memory = SharedMemory()

prompts.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""项目七:Prompt 层。

定义多智能体内容创作平台各角色的系统提示词。
"""
from __future__ import annotations

# 主编 Agent 系统提示词
EDITOR_SYSTEM_PROMPT = """你是内容创作平台的主编 Agent,负责协调团队完成高质量文章创作。

## 你的团队
- 👨‍💼 策划师 (plan_content):生成文章大纲
- ✍️ 撰稿人 (write_section):撰写各章节内容
- 🔍 审稿人 (review_content):审查质量并提供反馈
- 🎨 配图设计师 (suggest_images):提供配图建议

## 工作流程(严格执行)
1. **第一步**:调用 plan_content 生成文章大纲
2. **第二步**:调用 write_section 逐章节撰写(至少写 2-3 个章节)
3. **第三步**:全部写完后调用 review_content 进行审校
4. **第四步**:调用 suggest_images 为文章配图
5. **第五步**:合并所有内容,输出完整文章

## 质量要求
- 文章结构清晰,逻辑连贯
- 语言通俗易懂,适合目标读者
- 每章节 800-1500 字
- 配图建议具体,可用于 AI 生成

## 最终交付格式
```markdown
# 文章标题

## 导语
(导语内容)

## 章节 1
(章节内容)

## 章节 2
(章节内容)

## 结语
(结语内容)

---

## 审校反馈
(审稿意见)

---

## 配图建议
(每张图的建议)
```"""


# 策划师 Agent 提示词
PLANNER_PROMPT = """你是资深内容策划师,擅长设计高质量文章的结构和大纲。

请为主题「{topic}」生成详细的文章大纲。

## 大纲要求
1. 吸引人的标题(结合热点和 SEO)
2. 100-200 字的导语,说明文章价值
3. 3-5 个核心章节,每章标题明确
4. 简短的结语,总结要点并引导行动
5. 明确每个章节的核心观点和写作方向

## 输出格式
```markdown
# 文章标题

## 导语
(导语内容)

## 章节列表
1. 第一章标题 - (本章核心要点)
2. 第二章标题 - (本章核心要点)
3. 第三章标题 - (本章核心要点)

## 结语
(结语方向)
```"""


# 撰稿人 Agent 提示词
WRITER_PROMPT = """你是专业撰稿人,文风通俗生动,擅长把复杂概念讲清楚。

## 文章大纲
{outline}

## 当前任务
请撰写章节「{section_title}」的完整内容。

## 写作要求
1. 800-1500 字
2. 段落分明,每段不超过 5 行
3. 使用生活化案例辅助理解
4. 适当使用列表和加粗强调重点
5. 结尾过渡到下一章或总结本章要点

## 目标读者
互联网行业从业者、产品经理、工程师,具有一定技术背景但非专家。

请直接输出章节内容,不需要解释写作思路。"""


# 审稿人 Agent 提示词
REVIEWER_PROMPT = """你是首席编辑,具有 10 年以上审稿经验。

请审查以下文章内容,给出专业反馈。

## 已完成内容
{all_content}

## 审查维度
1. **逻辑连贯性**:章节之间是否衔接自然?论点是否有足够支撑?
2. **语言表达**:是否有语法错误?语句是否通顺易懂?
3. **内容质量**:是否有遗漏的重要观点?案例是否贴切?
4. **可读性**:段落长度是否合适?重点是否突出?

## 输出格式
```
【整体评分】X/10

【优点】
- (列出 2-3 个优点)

【问题】
- (列出具体问题及位置)

【修改建议】
- (具体可操作的建议)
```"""


# 配图设计师 Agent 提示词
DESIGNER_PROMPT = """你是视觉设计师,擅长为文章设计配图方案。

## 文章大纲
{outline}

## 任务
为每个章节设计 1-2 张配图建议。

## 配图要求
1. 图片类型:示意图、数据图、场景图、概念图四选一
2. AI 生成提示词:英文,简洁准确,包含主体、风格、色调
3. 中文描述:说明图片在文章中的作用和位置

## 输出格式
```
## 第 N 章:章节标题

### 配图 1:[图片类型]
英文 Prompt: (stable diffusion / DALL-E prompt)
说明:(这张图表达什么,放在哪里合适)

### 配图 2:[图片类型]
英文 Prompt: ...
说明:...
```"""


# 内容整合提示词
ASSEMBLE_PROMPT = """请将以下内容整合成一篇完整的文章。

## 大纲
{outline}

## 已完成章节
{sections_text}

## 审校反馈
{review_text}

## 配图建议
{images_text}

要求:
1. 保持 Markdown 格式
2. 标题层级正确
3. 各部分之间有清晰的分隔线
4. 格式美观,便于阅读"""

tools.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
"""项目七:工具层。

定义多智能体内容创作平台的各角色工具函数。
"""
from __future__ import annotations

from typing import Any

from langchain.tools import tool

from core import build_chat_model
from core.logging_conf import get_logger

from .models import shared_memory
from .prompts import (
DESIGNER_PROMPT,
PLANNER_PROMPT,
REVIEWER_PROMPT,
WRITER_PROMPT,
)

logger = get_logger("p07.content_creation.tools")


@tool
def plan_content(topic: str) -> str:
"""策划师:根据主题生成内容大纲和结构规划。

Args:
topic: 文章主题

Returns:
Markdown 格式的大纲(标题 + 导语 + 章节列表 + 结语)
"""
logger.info("策划师开始工作,主题: %s", topic)

prompt = PLANNER_PROMPT.format(topic=topic)

try:
model = build_chat_model()
result = model.invoke(prompt)
outline = result.content if hasattr(result, "content") else str(result)

shared_memory.put("outline", outline)
shared_memory.put("topic", topic)

logger.info("大纲生成完成,长度: %d", len(outline))
return outline
except Exception as e:
logger.error("大纲生成失败: %s", e)
return f"❌ 大纲生成失败: {e}"


@tool
def write_section(section_title: str) -> str:
"""撰稿人:根据大纲中指定的章节标题,撰写该章节的完整内容。

会先读取共享记忆中的大纲,确保内容与大纲一致。

Args:
section_title: 要撰写的章节标题

Returns:
章节内容
"""
logger.info("撰稿人开始工作,章节: %s", section_title)

outline = shared_memory.get("outline")
if not outline:
return "⚠️ 请先生成文章大纲,再开始写作。"

prompt = WRITER_PROMPT.format(
outline=outline,
section_title=section_title,
)

try:
model = build_chat_model()
result = model.invoke(prompt)
content = result.content if hasattr(result, "content") else str(result)

key = f"section:{section_title}"
shared_memory.put(key, content)

logger.info("章节「%s」撰写完成,字数: %d", section_title, len(content))
return content
except Exception as e:
logger.error("章节撰写失败: %s", e)
return f"❌ 章节撰写失败: {e}"


@tool
def review_content() -> str:
"""审稿人:审查共享记忆中所有已完成内容,检查逻辑、语法、可读性。

Returns:
审校意见(问题列表 + 修改建议)
"""
logger.info("审稿人开始工作")

all_content = []
section_count = 0

for key in shared_memory.keys():
if key.startswith("section:"):
content = shared_memory.get(key)
section_title = key[8:] # 去掉 "section:" 前缀
all_content.append(f"--- {section_title} ---\n{content}")
section_count += 1

if section_count == 0:
return "⚠️ 尚无内容可供审校,请先撰写至少一个章节。"

combined = "\n\n".join(all_content)

prompt = REVIEWER_PROMPT.format(all_content=combined[:8000])

try:
model = build_chat_model()
result = model.invoke(prompt)
feedback = result.content if hasattr(result, "content") else str(result)

shared_memory.put("review_feedback", feedback)

logger.info("审校完成")
return feedback
except Exception as e:
logger.error("审校失败: %s", e)
return f"❌ 审校失败: {e}"


@tool
def suggest_images() -> str:
"""配图设计师:根据文章大纲,为每个章节建议配图方案。

Returns:
配图建议列表(图片类型 + AI 生成 prompt)
"""
logger.info("配图设计师开始工作")

outline = shared_memory.get("outline")
if not outline:
return "⚠️ 请先生成文章大纲。"

prompt = DESIGNER_PROMPT.format(outline=outline[:3000])

try:
model = build_chat_model()
result = model.invoke(prompt)
images = result.content if hasattr(result, "content") else str(result)

shared_memory.put("images", images)

logger.info("配图建议生成完成")
return images
except Exception as e:
logger.error("配图建议生成失败: %s", e)
return f"❌ 配图建议生成失败: {e}"


@tool
def read_memory(key: str = "") -> str:
"""读取共享记忆池中的内容。

Args:
key: 记忆键,留空查看所有可用的记忆键和摘要

Returns:
记忆内容或摘要
"""
if not key:
return shared_memory.summary()

content = shared_memory.get(key)
if not content:
return f"⚠️ 记忆键 '{key}' 不存在,可用的键有:{', '.join(shared_memory.keys())}"
return content


@tool
def clear_memory() -> str:
"""清空共享记忆池,用于开始新的创作任务。

Returns:
操作结果
"""
shared_memory.clear()
logger.info("共享记忆池已清空")
return "✅ 共享记忆池已清空,可以开始新的创作任务。"


def get_all_tools() -> list[Any]:
"""获取所有可用工具列表。

Returns:
工具对象列表
"""
return [
plan_content,
write_section,
review_content,
suggest_images,
read_memory,
clear_memory,
]

graph.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
"""项目七:Graph 层(Multi-Agent 编排)。

定义多智能体内容创作平台的工作流编排逻辑。
"""
from __future__ import annotations

import asyncio
from typing import Any

from langchain.agents import create_agent

from core import build_chat_model
from core.logging_conf import get_logger

from .models import ContentTask, TaskStatus, shared_memory
from .prompts import EDITOR_SYSTEM_PROMPT
from .tools import get_all_tools, plan_content, read_memory

logger = get_logger("p07.content_creation.graph")


class ContentCreationGraph:
"""内容创作工作流图。

编排多个 Agent 协作完成文章创作:
1. 策划 Agent 生成大纲
2. 写作 Agent 撰写各章节,配图 Agent 生成配图建议(两者只依赖大纲,并行执行)
3. 审校 Agent 审查正文质量
4. 整合输出最终文章
"""

def __init__(self) -> None:
self._tasks: dict[str, ContentTask] = {}
self._agent: Any | None = None

def build_agent(self) -> Any:
"""构建主编 Agent。

Returns:
LangChain Agent 对象
"""
if self._agent is not None:
return self._agent

self._agent = create_agent(
model=build_chat_model(),
tools=get_all_tools(),
system_prompt=EDITOR_SYSTEM_PROMPT,
)
return self._agent

def create_task(self, topic: str, task_id: str | None = None) -> ContentTask:
"""创建内容创作任务。

Args:
topic: 文章主题
task_id: 可选任务 ID

Returns:
任务对象
"""
import uuid

if task_id is None:
task_id = f"content_{uuid.uuid4().hex[:8]}"

task = ContentTask(
task_id=task_id,
topic=topic,
status=TaskStatus.PENDING,
)
self._tasks[task_id] = task
logger.info("创建内容创作任务: %s - %s", task_id, topic)
return task

async def run_planner_step(self, task: ContentTask) -> bool:
"""运行策划步骤。

Args:
task: 任务对象

Returns:
是否成功
"""
try:
task.status = TaskStatus.RUNNING
logger.info("[%s] 步骤 1/3: 生成大纲", task.task_id)

outline_text = plan_content.invoke({"topic": task.topic})

if "失败" in outline_text or "⚠️" in outline_text:
logger.error("大纲生成失败: %s", outline_text)
return False

logger.info("大纲生成成功,长度: %d", len(outline_text))
return True
except Exception as e:
logger.error("策划步骤异常: %s", e)
return False

async def run_writer_step(self, task: ContentTask, section_titles: list[str]) -> bool:
"""运行写作步骤。

Args:
task: 任务对象
section_titles: 章节标题列表

Returns:
是否成功
"""
try:
logger.info("[%s] 步骤 2/3: 撰写章节", task.task_id)

from .tools import write_section

for i, title in enumerate(section_titles, 1):
logger.info(" 撰写第 %d/%d 章: %s", i, len(section_titles), title)
content = write_section.invoke({"section_title": title})
if "失败" in content:
logger.warning("章节「%s」撰写失败", title)
continue
task.add_section(title, content, order=i)

logger.info("章节撰写完成,共 %d 章,总字数: %d",
len(task.sections), task.get_total_words())
return len(task.sections) > 0
except Exception as e:
logger.error("写作步骤异常: %s", e)
return False

async def run_reviewer_step(self, task: ContentTask) -> bool:
"""运行审校步骤。

Args:
task: 任务对象

Returns:
是否成功
"""
try:
logger.info("[%s] 步骤 3/3: 内容审校", task.task_id)

from .tools import review_content

feedback = await asyncio.to_thread(review_content.invoke, {})
if "尚无" in feedback or "失败" in feedback:
logger.warning("审校未完成: %s", feedback)
return False

logger.info("审校完成")
return True
except Exception as e:
logger.error("审校步骤异常: %s", e)
return False

async def run_designer_step(self, task: ContentTask) -> bool:
"""运行配图设计步骤。

Args:
task: 任务对象

Returns:
是否成功
"""
try:
logger.info("[%s] 步骤 2/3: 配图设计(与写作并行)", task.task_id)

from .tools import suggest_images

# 用 asyncio.to_thread 把同步工具调用扔进线程池,
# 避免阻塞事件循环,使写作步骤得以与之真正并行
images = await asyncio.to_thread(suggest_images.invoke, {})
if "先生成" in images or "失败" in images:
logger.warning("配图设计未完成: %s", images)
return False

logger.info("配图设计完成")
return True
except Exception as e:
logger.error("配图步骤异常: %s", e)
return False

def assemble_final_article(self, task: ContentTask) -> str:
"""整合最终文章。

Args:
task: 任务对象

Returns:
完整文章 Markdown
"""
memory_summary = read_memory.invoke({"key": ""})

outline = shared_memory.get("outline")
review = shared_memory.get("review_feedback", "")
images = shared_memory.get("images", "")

sections_text = "\n\n".join([
f"## {s.title}\n\n{s.content}"
for s in task.sections
])

final_article = f"""# {task.topic}

---

## 大纲
{outline}

---

## 正文

{sections_text}

---

## 审校反馈
{review or '(暂无)'}

---

## 配图建议
{images or '(暂无)'}
"""
task.final_article = final_article
return final_article

async def run_full_workflow(self, topic: str) -> str:
"""运行完整创作工作流。

Args:
topic: 文章主题

Returns:
最终文章
"""
task = self.create_task(topic)

# 清空之前的记忆
shared_memory.clear()

# 步骤 1: 策划
success = await self.run_planner_step(task)
if not success:
return "❌ 大纲生成失败,请重试"

# 步骤 2: 写作 + 配图 并行
# 两者都只依赖大纲、彼此无依赖:写作写 section:*,配图写 images,
# SharedMemory 是普通 dict,写不同键在 GIL 下并发安全。
agent = self.build_agent()
await asyncio.gather(
self._agent_write_sections(agent, task),
self.run_designer_step(task),
)

# 步骤 3: 审校(依赖写作产出的正文,须等写作完成)
await self.run_reviewer_step(task)

# 整合输出
task.status = TaskStatus.COMPLETED
return self.assemble_final_article(task)

async def _agent_write_sections(self, agent: Any, task: ContentTask) -> None:
"""让 Agent 自主决定写作哪些章节。

Args:
agent: Agent 对象
task: 任务对象
"""
prompt = (
f"文章主题:{task.topic}\n"
"大纲已生成,请根据大纲撰写 2-3 个核心章节。"
"写完每个章节后检查是否还需要继续写。"
)
try:
# 交给 Agent 自主控制写作流程
# 用 asyncio.to_thread 把同步 agent.run 扔进线程池,
# 避免阻塞事件循环,使配图步骤得以与之真正并行
result = await asyncio.to_thread(agent.run, prompt)
logger.info("Agent 自主写作完成")
except Exception as e:
logger.error("Agent 自主写作异常: %s", e)


# 全局 Graph 实例
_graph: ContentCreationGraph | None = None


def get_graph() -> ContentCreationGraph:
"""获取内容创作工作流图实例。

Returns:
ContentCreationGraph 单例
"""
global _graph
if _graph is None:
_graph = ContentCreationGraph()
return _graph

project.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""项目七:项目定义层。

定义多智能体内容创作平台的项目注册和对外接口。
"""
from __future__ import annotations

from typing import Any

from core import BaseProject, registry

from .graph import get_graph
from .tools import get_all_tools


class ContentCreationProject(BaseProject):
"""多智能体内容创作平台。

主讲能力:Multi-Agent 编排 + 共享记忆系统

业务场景:用户给一个主题,多个 Agent 分工协作完成一篇高质量文章:
策划 Agent →(写作 Agent ‖ 配图 Agent)→ 审校 Agent

生产级特性:
- 主管-子智能体编排(Orchestrator Pattern)
- 共享记忆池(各 Agent 输出存入共享上下文,下个 Agent 可见)
- 子 Agent 独立 System Prompt(角色专精,互不干扰)
- 并行执行(写作和配图用 asyncio.gather 同时进行)
- 结构化输出(最终文章为 Markdown 格式)
"""

id = "p07_content_creation"
name = "多智能体内容创作平台"
description = "策划+写作+审校+配图,四 Agent 分工协作,共享记忆。"
capabilities = ["Multi-Agent", "Shared Memory", "Content Creation"]

def build_agent(self) -> Any:
"""构建主编 Agent 实例。

Returns:
LangChain Agent 对象
"""
graph = get_graph()
return graph.build_agent()

def run(self, message: str) -> str:
"""运行内容创作任务。

Args:
message: 用户输入的文章主题或创作需求

Returns:
创作结果
"""
import asyncio

graph = get_graph()

try:
# 尝试运行完整工作流
loop = asyncio.get_event_loop_policy().get_event_loop()
if loop.is_running():
# 如果已有事件循环(如在服务中),使用 Agent 直接运行
agent = self.build_agent()
return agent.run(message)
else:
# 否则异步运行完整工作流
return asyncio.run(graph.run_full_workflow(message))
except Exception as e:
# 降级方案:直接用 Agent 运行
agent = self.build_agent()
return agent.run(message)


# 项目实例
project = ContentCreationProject()

# 对外暴露的快捷函数
run = project.run
build_agent = project.build_agent
get_tools = get_all_tools

__all__ = [
"ContentCreationProject",
"project",
"run",
"build_agent",
"get_tools",
]

init.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""项目七:多智能体内容创作平台(Multi-Agent + 共享记忆)

主讲能力:Multi-Agent 编排 + 共享记忆系统

业务场景:用户给一个主题,多个 Agent 分工协作完成一篇高质量文章:
策划 Agent →(写作 Agent ‖ 配图 Agent)→ 审校 Agent

生产级特性:
- 主管-子智能体编排(Orchestrator Pattern)
- 共享记忆池(各 Agent 输出存入共享上下文,下个 Agent 可见)
- 子 Agent 独立 System Prompt(角色专精,互不干扰)
- 并行执行(写作和配图用 asyncio.gather 同时进行)
- 结构化输出(最终文章为 Markdown 格式)

架构分层:
- models.py: 数据模型、共享记忆池
- prompts.py: 各角色系统提示词
- tools.py: 各 Agent 工具函数
- graph.py: Multi-Agent 工作流编排
- project.py: 项目定义、对外接口
"""
from __future__ import annotations

from core import registry

# 导出 models 层
from .models import (
AgentRole,
ArticleSection,
ContentOutline,
ContentTask,
ImageSuggestion,
ReviewFeedback,
SharedMemory,
shared_memory,
TaskStatus,
)

# 导出 prompts 层
from .prompts import (
ASSEMBLE_PROMPT,
DESIGNER_PROMPT,
EDITOR_SYSTEM_PROMPT,
PLANNER_PROMPT,
REVIEWER_PROMPT,
WRITER_PROMPT,
)

# 导出 tools 层(保持向后兼容)
from .tools import (
clear_memory,
get_all_tools,
plan_content,
read_memory,
review_content,
suggest_images,
write_section,
)

# 导出 graph 层
from .graph import ContentCreationGraph, get_graph

# 导出 project 层
from .project import (
ContentCreationProject,
build_agent,
get_tools,
project,
run,
)

# 向后兼容别名:原单文件版本使用 SYSTEM_PROMPT
SYSTEM_PROMPT = EDITOR_SYSTEM_PROMPT

# 注册项目到全局注册表
registry.register(project)

__all__ = [
# models
"AgentRole",
"TaskStatus",
"ContentOutline",
"ArticleSection",
"ImageSuggestion",
"ReviewFeedback",
"ContentTask",
"SharedMemory",
"shared_memory",
# prompts
"EDITOR_SYSTEM_PROMPT",
"SYSTEM_PROMPT",
"PLANNER_PROMPT",
"WRITER_PROMPT",
"REVIEWER_PROMPT",
"DESIGNER_PROMPT",
"ASSEMBLE_PROMPT",
# tools
"plan_content",
"write_section",
"review_content",
"suggest_images",
"read_memory",
"clear_memory",
"get_all_tools",
# graph
"ContentCreationGraph",
"get_graph",
# project
"ContentCreationProject",
"project",
"run",
"build_agent",
"get_tools",
]

9.4.2 核心代码讲解

1. 共享记忆池(SharedMemory)——编辑部的共享文档

models.py 里的 SharedMemory 是整个平台的状态枢纽,相当于编辑部的共享文档。每个 Agent 干完活,用 shared_memory.put(key, content) 把成果丢进去;下一个 Agent 用 shared_memory.get(key) 取自己要的。比如撰稿人读策划师写入的 outline,审稿人遍历所有 section:* 键把正文汇总起来。记忆池还记着写入顺序(_order),并提供 summary() 摘要与 clear() 重置。这样一来,多个 Agent 谁也不用直接喊谁,只对着“共享文档”读写,就完成了解耦的状态交接。

💡 顿悟时刻:状态共享不一定要靠复杂的消息传递。一个 key-value 的“共享文档”,往往就是最稳妥的协作方式——简单、可观测、谁都能查。

2. 主管-子智能体编排(Orchestrator Pattern)——主编不亲自下场

graph.pyContentCreationGraph 扮演“主编/主管”,调度四个角色工具:策划(plan_content)、写作(write_section)、审校(review_content)、配图(suggest_images)。主编自己不写一个字,只负责拆活、派活,在 run_full_workflow 里把各步骤编排起来,最后由 assemble_final_article 拼版。这就是 Orchestrator 模式的精髓:主管管编排与决策,角色工具管执行,职责清晰,将来想加个“排版 Agent”也只是多挂一个工具的事。

3. 各角色独立 Prompt——一角色一剧本

prompts.py 给每个角色单独写了一份系统提示词:PLANNER_PROMPT 管结构规划、WRITER_PROMPT 管成稿、REVIEWER_PROMPT 管质量审查、DESIGNER_PROMPT 管配图方案,EDITOR_SYSTEM_PROMPT 则约束主编的调度顺序。每份 Prompt 只讲本角色要干嘛、输出长啥样,互不干扰。这种“一角色一剧本”的隔离,让模型上下文更聚焦、输出更稳,也方便单独调优某个角色——比如想换文风,只动 WRITER_PROMPT 就行,别的角色纹丝不碰。

4. Graph 层的异步工作流编排——写作与配图真并行

graph.py 把每个步骤实现成 async 方法(run_planner_step / run_writer_step / run_reviewer_step / run_designer_step),由 run_full_workflow 编排。build_agent 通过 create_agent 构建主编 Agent,工具来自 get_all_tools()。步骤之间用 ContentTask 对象传递状态,每步还打日志标进度(如“步骤 2/3”),整条流水线可观测、可追踪。

💡 关键在并行:策划完大纲后,写作和配图只依赖大纲、彼此无依赖,于是用 asyncio.gather_agent_write_sectionsrun_designer_step 同时发起——这是真并行,不是“串行 await”。两个分支写各自的共享记忆键(写作写 section:*,配图写 images),SharedMemory 是普通 dict,写不同键在 GIL 下并发安全。唯有审校要读正文,所以排在并行环节之后。同步的 agent.run 和工具 invoke 都用 asyncio.to_thread 扔进线程池,避免阻塞事件循环、保证两个分支真能同时跑。

5. 错误降级处理——事件循环探测与兜底

project.pyrun 方法得同时伺候两种场景:脚本里直跑、已经嵌在异步服务里跑。它先用 asyncio.get_event_loop_policy().get_event_loop() 探一下当前有没有正在转的事件循环:若有(loop.is_running()),说明身在异步服务中,不能再 asyncio.run(否则报“事件循环已运行”),于是直接交给主编 Agent 处理;没有的话,才用 asyncio.run(graph.run_full_workflow(message)) 跑完整工作流。万一上述流程抛异常,还会兜底降级成“直接用 Agent 运行”。“事件循环检测 + 异常兜底”这套组合拳,保证项目在不同环境都不至于因事件循环冲突而崩。

6. 三层架构的解耦优势——从一坨代码到一支队伍

模型层(models.py)定数据结构与共享记忆,提示词层(prompts.py)封装角色指令,工具层(tools.py)实现具体能力,编排层(graph.py)组织流程,项目层(project.py)与入口(__init__.py)管注册与对外暴露。各层单向依赖、边界清晰:改某个角色的 Prompt 不碰工具实现,新增一个 Agent 只需加工具和编排步骤,数据模型一行不用动。这套分层,把多智能体系统从“一坨代码”拆成了可独立演进、可单元测试的模块化结构——和把一个人拆成一支编辑部,是同一个道理。


9.5 跑一跑:它真的行吗

光跑通不算完,得测。这支“编辑部”上线前,至少要测这几样:

  • 大纲工具返回结构化章节
  • 配图工具返回多条建议
  • 汇总节点能合并状态
  • 集成测试生成完整 Markdown

把每个角色的产出和最终拼版分别验证,才能保证流水线哪一环都不掉链子。


9.6 送上线:让它上班

该项目在统一管理台中表现为一个“主题输入 → 文档输出”的工作流。前端的执行过程面板会显示 planner、writer、reviewer、illustrator、assemble 等节点流转——用户看到的,就是一支编辑部在流水线上接力交棒。


9.7 回头看:学到了什么

回头看,Multi-Agent 的价值从来不是“把一个问题拆成多个机器人显得高级”,而是用角色隔离降低复杂度。当任务天然包含多个专业角色时——比如内容创作里的策划、写作、审校、配图——多智能体比一个全能 Agent 更稳、更好维护。每个 Agent 只背自己那口锅,上下文干净,输出才干净。

金句:一个人演完整个编辑部,每个角色都演不到位;让每个 Agent 只演一个角色,整台戏才出彩。

说明一下并行的真实情况,免得你误读:写作和配图这两步,graph.py 确实用 asyncio.gather 同时发起,是真并行而非串行 await;审校依赖正文,才排在并行环节之后。本章真正还没做的,是把它升级成 LangGraph 的 StateGraph 原生 fan-out——目前是手写 SharedMemory + asyncio.gather 的轻量编排。把“已经并行”和“还能用更标准的方式并行”分清楚,是工程人的本分。

下一章,我们把这种协作模式搬到一个更严肃的场景——研究分析。


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !