主讲能力:Multi-Agent 协作、共享状态、并行节点
业务场景:用户输入一个主题,系统自动组织一支"内容团队"完成策划、写作、审稿、配图建议和最终汇总。
对应代码:backend/projects/p07_content_creation/
9.1 编辑的困境
周六凌晨一点,公众号编辑小李还对着屏幕发呆。一篇《AI Agent 工程化实践》的文章,他从下午写到深夜:先憋大纲,大纲改了三版才动笔写正文;正文写到一半又觉得结构不对,回头改大纲;好不容易写完,自己审一遍发现前后文风不统一;最后还得自己找配图、写图注。
最让他崩溃的不是累,而是“身份切换”——上一秒还在以策划的口吻想“读者到底想看什么”,下一秒就得用审校的眼光挑“这段逻辑通不通”,再下一秒又要切成设计师琢磨“这里配张什么图”。一个人演完整个编辑部,每个角色都演不到位。
小李的处境,正是单 Agent 做内容创作的缩影。把“想清楚写什么、写出来、挑毛病、配图”全塞进一个 Agent 的上下文,它和小李一样分身乏术:一会儿顾结构、一会儿顾文笔、一会儿顾配图,哪样都不精,输出自然散乱、忽高忽低。
后来小李的公司补了个小团队:策划、撰稿、审校、设计各一人。每人只干自己那摊事,彼此接力——策划定大纲交给撰稿,撰稿写完交审校,审校改完配设计给的图,主编最后拼成稿。质量上去了,谁也不必精分。
本项目要做的,就是给 AI 也组一支这样的小编辑部:让多个 Agent 各司其职,而不是逼一个 Agent 全能。对应到角色分工:
| 角色 |
职责 |
| 策划 Agent |
明确主题、受众、文章结构 |
| 写作 Agent |
根据大纲撰写正文 |
| 审稿 Agent |
统一风格、检查逻辑、提出修改建议 |
| 配图 Agent |
生成配图建议和 AI 绘图提示词 |
| 汇总节点 |
输出完整 Markdown 文稿 |
功能需求
- 用户输入主题后自动生成完整文章。
- 各 Agent 角色职责清晰,不互相越界。
- 写作和配图只依赖大纲、彼此无依赖,可并行以提高效率(实现见 9.4.2)。
- 最终输出 Markdown,可直接发布到博客或 CMS。
9.2 画个样子:它该长啥样
整个编辑部怎么接力?看这张流程图就一目了然——策划打头阵定大纲,写作和配图都从大纲分叉出去,审校等正文写完再上场,最后汇总节点把所有人的活儿拼成一篇稿:
%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
graph TD
Topic["用户主题"] --> Planner["策划 Agent"]
Planner --> Writer["写作 Agent"]
Planner --> Illustrator["配图 Agent"]
Writer --> Reviewer["审稿 Agent"]
Reviewer --> Assemble["汇总节点"]
Illustrator --> Assemble
Assemble --> Final["最终 Markdown 文稿"]
classDef role fill:#e0f7fa,stroke:#00acc1,stroke-width:2px,color:#006064
classDef out fill:#e8f5e9,stroke:#43a047,stroke-width:2px,color:#1b5e20
class Planner,Writer,Reviewer,Illustrator,Assemble role
class Final out
💡 实现说明:图里画的"写作与配图并行"不是空想——graph.py 用 asyncio.gather 把写作和配图同时发起,两者都只依赖大纲、各写各的共享记忆键(section:* 与 images),互不干扰。审校依赖正文,所以排在并行环节之后。细节在 9.4.2 展开。
用户只需输入:
写一篇关于 AI Agent 工程化实践的文章。
系统输出包括:大纲、正文、审稿意见、配图建议。
9.3 拆开看:怎么造出来
先说清楚一个事实,免得你翻代码时犯迷糊:本项目并没有用 LangGraph 的 StateGraph,共享状态也不是一个 State 类型,而是一个自定义的 SharedMemory 类(见 models.py)。工作流由 ContentCreationGraph 这个类手动编排,主编 Agent 用 LangChain 的 create_agent 构建。早期文档里“StateGraph / State”的叫法,更像是设计阶段的设想,落地时换成了更轻的共享记忆池。下面按实际实现来讲。
为什么不做一个“全能 Agent”?
刚接触 Multi-Agent 时,很多人都会冒出一个疑问:现在的大模型这么强,一个 Agent 既能规划、又能写作、又能审校,干嘛非得拆开?拆开不就多一层调度、多一堆通信吗?
这疑问合理,但真上手就明白了。我最早试的就是“全能 Agent”——一个 Prompt 塞进“先想大纲、再写正文、再审稿、最后配图”全套指令。结果很尴尬:模型一会儿忘了写到第几章,一会儿把审稿意见混进正文,配图提示词干脆漏了。上下文越长、角色越杂,它越容易“走神”,因为得在同一份注意力里不停切换人格。
💡 顿悟时刻:人脑也一样。让一个人同时当策划、撰稿、审校、设计,他也会乱。高效编辑部靠的不是“一个全能编辑”,而是“每个岗位只盯一件事”。Agent 同理——角色隔离,本质是给模型减负。一个 Agent 一个身份、一份聚焦的上下文,输出反而稳。
所以 Multi-Agent 的价值不在“显得高级”,而在用角色隔离降低复杂度。策划 Agent 的上下文里只有大纲,写作 Agent 的上下文里只有正文,互不干扰,各自专精。
Agent 之间怎么交接?——共享记忆池
各司其职不等于各干各的。策划写好的大纲,撰稿得看到;撰稿写完的正文,审校得读到。最直觉的做法是 Agent 之间互相传参,但耦合太紧,加一个角色就得改一串调用链。
本项目的做法是引入共享记忆池(SharedMemory):每个 Agent 干完活,把成果往池子里一扔(put);下一个 Agent 上班时,从池子里取自己要的(get)。Agent 之间不见面,只跟记忆池打交道。这就像编辑部的共享文档:策划把大纲放进去,撰稿打开看,写完正文也放进去,审校再打开读。谁也不必认识谁,文档是唯一的交接物。
⚠️ 避坑:共享记忆池好用,但别什么都往里塞。本项目只放各角色的关键产出(大纲、章节正文、审稿意见、配图建议),不放中间的琐碎思考。池子越臃肿,后续 Agent 被无关信息干扰的风险越大。
并行节点为什么可行(设计层面)
看流程图会发现:写作和配图都只依赖大纲,彼此无依赖,理论上能同时开工。这点 9.2 已经提过,这里再强调一次设计意图,方便理解图。
核心思想记牢:每个 Agent 只改自己负责的状态字段,多角色不互相污染;写作和配图只读大纲、各写各的字段,天然具备并行的基础。代码里也确实用 asyncio.gather 把这两步同时发起,9.4.2 讲 run_full_workflow 时细说。
9.4 动手写:三层架构完整代码
接下来看代码。这套多智能体内容创作平台按分层架构组织——从数据模型、提示词、工具,到工作流编排与项目注册,一层一层叠起来。各层职责清晰、单向依赖,和编辑部“各司其职”是同一个思路:谁的地盘谁做主,互不越位。
9.4.1 三层架构完整代码
这套“各司其职”落到代码上,就是六层分明:模型层、提示词层、工具层、编排层、项目层、入口。各层对应文件与职责如下:
下面依次给出每个文件的完整代码,原汁原味,不省一行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
| """项目七:数据模型层。
定义多智能体内容创作平台的领域模型和数据结构。 """ from __future__ import annotations
from dataclasses import dataclass, field from enum import Enum from typing import Any
class AgentRole(str, Enum): """Agent 角色枚举。""" PLANNER = "planner" WRITER = "writer" REVIEWER = "reviewer" DESIGNER = "designer" EDITOR = "editor"
class TaskStatus(str, Enum): """任务状态枚举。""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed"
@dataclass class ContentOutline: """文章大纲模型。""" topic: str title: str = "" introduction: str = "" sections: list[str] = field(default_factory=list) conclusion: str = ""
def to_markdown(self) -> str: """转换为 Markdown 格式。""" lines = [f"# {self.title or self.topic}", "", "## 导语", self.introduction] for i, section in enumerate(self.sections, 1): lines.extend(["", f"## 第 {i} 章:{section}"]) if self.conclusion: lines.extend(["", "## 结语", self.conclusion]) return "\n".join(lines)
@dataclass class ArticleSection: """文章章节模型。""" title: str content: str order: int = 0 word_count: int = 0
def __post_init__(self) -> None: """计算字数。""" self.word_count = len(self.content)
@dataclass class ImageSuggestion: """配图建议模型。""" section_title: str image_type: str prompt: str description: str = ""
@dataclass class ReviewFeedback: """审稿反馈模型。""" overall_score: int strengths: list[str] = field(default_factory=list) issues: list[str] = field(default_factory=list) suggestions: list[str] = field(default_factory=list)
@dataclass class ContentTask: """内容创作任务模型。""" task_id: str topic: str status: TaskStatus = TaskStatus.PENDING outline: ContentOutline | None = None sections: list[ArticleSection] = field(default_factory=list) images: list[ImageSuggestion] = field(default_factory=list) review: ReviewFeedback | None = None final_article: str = ""
def add_section(self, title: str, content: str, order: int = 0) -> None: """添加章节。""" self.sections.append(ArticleSection( title=title, content=content, order=order, ))
def get_total_words(self) -> int: """获取总字数。""" return sum(s.word_count for s in self.sections)
class SharedMemory: """多 Agent 共享记忆池。
所有 Agent 的输出都存入这里,后续 Agent 可读取。 """
def __init__(self) -> None: self._store: dict[str, Any] = {} self._order: list[str] = []
def put(self, key: str, content: Any) -> None: """存入记忆。
Args: key: 记忆键 content: 记忆内容 """ self._store[key] = content if key not in self._order: self._order.append(key)
def get(self, key: str) -> Any: """读取记忆。
Args: key: 记忆键
Returns: 记忆内容,不存在返回空字符串 """ return self._store.get(key, "")
def keys(self) -> list[str]: """获取所有记忆键。""" return list(self._order)
def summary(self) -> str: """生成记忆池摘要。
Returns: 摘要文本 """ lines = ["📦 共享记忆池内容:", ""] for k in self._order: content = self._store[k] preview = str(content)[:80].replace("\n", " ") lines.append(f" [{k}] {preview}...") return "\n".join(lines)
def clear(self) -> None: """清空记忆池。""" self._store.clear() self._order.clear()
shared_memory = SharedMemory()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
| """项目七:Prompt 层。
定义多智能体内容创作平台各角色的系统提示词。 """ from __future__ import annotations
EDITOR_SYSTEM_PROMPT = """你是内容创作平台的主编 Agent,负责协调团队完成高质量文章创作。
## 你的团队 - 👨💼 策划师 (plan_content):生成文章大纲 - ✍️ 撰稿人 (write_section):撰写各章节内容 - 🔍 审稿人 (review_content):审查质量并提供反馈 - 🎨 配图设计师 (suggest_images):提供配图建议
## 工作流程(严格执行) 1. **第一步**:调用 plan_content 生成文章大纲 2. **第二步**:调用 write_section 逐章节撰写(至少写 2-3 个章节) 3. **第三步**:全部写完后调用 review_content 进行审校 4. **第四步**:调用 suggest_images 为文章配图 5. **第五步**:合并所有内容,输出完整文章
## 质量要求 - 文章结构清晰,逻辑连贯 - 语言通俗易懂,适合目标读者 - 每章节 800-1500 字 - 配图建议具体,可用于 AI 生成
## 最终交付格式 ```markdown # 文章标题
## 导语 (导语内容)
## 章节 1 (章节内容)
## 章节 2 (章节内容)
## 结语 (结语内容)
---
## 审校反馈 (审稿意见)
---
## 配图建议 (每张图的建议) ```"""
PLANNER_PROMPT = """你是资深内容策划师,擅长设计高质量文章的结构和大纲。
请为主题「{topic}」生成详细的文章大纲。
## 大纲要求 1. 吸引人的标题(结合热点和 SEO) 2. 100-200 字的导语,说明文章价值 3. 3-5 个核心章节,每章标题明确 4. 简短的结语,总结要点并引导行动 5. 明确每个章节的核心观点和写作方向
## 输出格式 ```markdown # 文章标题
## 导语 (导语内容)
## 章节列表 1. 第一章标题 - (本章核心要点) 2. 第二章标题 - (本章核心要点) 3. 第三章标题 - (本章核心要点)
## 结语 (结语方向) ```"""
WRITER_PROMPT = """你是专业撰稿人,文风通俗生动,擅长把复杂概念讲清楚。
## 文章大纲 {outline}
## 当前任务 请撰写章节「{section_title}」的完整内容。
## 写作要求 1. 800-1500 字 2. 段落分明,每段不超过 5 行 3. 使用生活化案例辅助理解 4. 适当使用列表和加粗强调重点 5. 结尾过渡到下一章或总结本章要点
## 目标读者 互联网行业从业者、产品经理、工程师,具有一定技术背景但非专家。
请直接输出章节内容,不需要解释写作思路。"""
REVIEWER_PROMPT = """你是首席编辑,具有 10 年以上审稿经验。
请审查以下文章内容,给出专业反馈。
## 已完成内容 {all_content}
## 审查维度 1. **逻辑连贯性**:章节之间是否衔接自然?论点是否有足够支撑? 2. **语言表达**:是否有语法错误?语句是否通顺易懂? 3. **内容质量**:是否有遗漏的重要观点?案例是否贴切? 4. **可读性**:段落长度是否合适?重点是否突出?
## 输出格式 ``` 【整体评分】X/10
【优点】 - (列出 2-3 个优点)
【问题】 - (列出具体问题及位置)
【修改建议】 - (具体可操作的建议) ```"""
DESIGNER_PROMPT = """你是视觉设计师,擅长为文章设计配图方案。
## 文章大纲 {outline}
## 任务 为每个章节设计 1-2 张配图建议。
## 配图要求 1. 图片类型:示意图、数据图、场景图、概念图四选一 2. AI 生成提示词:英文,简洁准确,包含主体、风格、色调 3. 中文描述:说明图片在文章中的作用和位置
## 输出格式 ``` ## 第 N 章:章节标题
### 配图 1:[图片类型] 英文 Prompt: (stable diffusion / DALL-E prompt) 说明:(这张图表达什么,放在哪里合适)
### 配图 2:[图片类型] 英文 Prompt: ... 说明:... ```"""
ASSEMBLE_PROMPT = """请将以下内容整合成一篇完整的文章。
## 大纲 {outline}
## 已完成章节 {sections_text}
## 审校反馈 {review_text}
## 配图建议 {images_text}
要求: 1. 保持 Markdown 格式 2. 标题层级正确 3. 各部分之间有清晰的分隔线 4. 格式美观,便于阅读"""
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
| """项目七:工具层。
定义多智能体内容创作平台的各角色工具函数。 """ from __future__ import annotations
from typing import Any
from langchain.tools import tool
from core import build_chat_model from core.logging_conf import get_logger
from .models import shared_memory from .prompts import ( DESIGNER_PROMPT, PLANNER_PROMPT, REVIEWER_PROMPT, WRITER_PROMPT, )
logger = get_logger("p07.content_creation.tools")
@tool def plan_content(topic: str) -> str: """策划师:根据主题生成内容大纲和结构规划。
Args: topic: 文章主题
Returns: Markdown 格式的大纲(标题 + 导语 + 章节列表 + 结语) """ logger.info("策划师开始工作,主题: %s", topic)
prompt = PLANNER_PROMPT.format(topic=topic)
try: model = build_chat_model() result = model.invoke(prompt) outline = result.content if hasattr(result, "content") else str(result)
shared_memory.put("outline", outline) shared_memory.put("topic", topic)
logger.info("大纲生成完成,长度: %d", len(outline)) return outline except Exception as e: logger.error("大纲生成失败: %s", e) return f"❌ 大纲生成失败: {e}"
@tool def write_section(section_title: str) -> str: """撰稿人:根据大纲中指定的章节标题,撰写该章节的完整内容。
会先读取共享记忆中的大纲,确保内容与大纲一致。
Args: section_title: 要撰写的章节标题
Returns: 章节内容 """ logger.info("撰稿人开始工作,章节: %s", section_title)
outline = shared_memory.get("outline") if not outline: return "⚠️ 请先生成文章大纲,再开始写作。"
prompt = WRITER_PROMPT.format( outline=outline, section_title=section_title, )
try: model = build_chat_model() result = model.invoke(prompt) content = result.content if hasattr(result, "content") else str(result)
key = f"section:{section_title}" shared_memory.put(key, content)
logger.info("章节「%s」撰写完成,字数: %d", section_title, len(content)) return content except Exception as e: logger.error("章节撰写失败: %s", e) return f"❌ 章节撰写失败: {e}"
@tool def review_content() -> str: """审稿人:审查共享记忆中所有已完成内容,检查逻辑、语法、可读性。
Returns: 审校意见(问题列表 + 修改建议) """ logger.info("审稿人开始工作")
all_content = [] section_count = 0
for key in shared_memory.keys(): if key.startswith("section:"): content = shared_memory.get(key) section_title = key[8:] all_content.append(f"--- {section_title} ---\n{content}") section_count += 1
if section_count == 0: return "⚠️ 尚无内容可供审校,请先撰写至少一个章节。"
combined = "\n\n".join(all_content)
prompt = REVIEWER_PROMPT.format(all_content=combined[:8000])
try: model = build_chat_model() result = model.invoke(prompt) feedback = result.content if hasattr(result, "content") else str(result)
shared_memory.put("review_feedback", feedback)
logger.info("审校完成") return feedback except Exception as e: logger.error("审校失败: %s", e) return f"❌ 审校失败: {e}"
@tool def suggest_images() -> str: """配图设计师:根据文章大纲,为每个章节建议配图方案。
Returns: 配图建议列表(图片类型 + AI 生成 prompt) """ logger.info("配图设计师开始工作")
outline = shared_memory.get("outline") if not outline: return "⚠️ 请先生成文章大纲。"
prompt = DESIGNER_PROMPT.format(outline=outline[:3000])
try: model = build_chat_model() result = model.invoke(prompt) images = result.content if hasattr(result, "content") else str(result)
shared_memory.put("images", images)
logger.info("配图建议生成完成") return images except Exception as e: logger.error("配图建议生成失败: %s", e) return f"❌ 配图建议生成失败: {e}"
@tool def read_memory(key: str = "") -> str: """读取共享记忆池中的内容。
Args: key: 记忆键,留空查看所有可用的记忆键和摘要
Returns: 记忆内容或摘要 """ if not key: return shared_memory.summary()
content = shared_memory.get(key) if not content: return f"⚠️ 记忆键 '{key}' 不存在,可用的键有:{', '.join(shared_memory.keys())}" return content
@tool def clear_memory() -> str: """清空共享记忆池,用于开始新的创作任务。
Returns: 操作结果 """ shared_memory.clear() logger.info("共享记忆池已清空") return "✅ 共享记忆池已清空,可以开始新的创作任务。"
def get_all_tools() -> list[Any]: """获取所有可用工具列表。
Returns: 工具对象列表 """ return [ plan_content, write_section, review_content, suggest_images, read_memory, clear_memory, ]
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
| """项目七:Graph 层(Multi-Agent 编排)。
定义多智能体内容创作平台的工作流编排逻辑。 """ from __future__ import annotations
import asyncio from typing import Any
from langchain.agents import create_agent
from core import build_chat_model from core.logging_conf import get_logger
from .models import ContentTask, TaskStatus, shared_memory from .prompts import EDITOR_SYSTEM_PROMPT from .tools import get_all_tools, plan_content, read_memory
logger = get_logger("p07.content_creation.graph")
class ContentCreationGraph: """内容创作工作流图。
编排多个 Agent 协作完成文章创作: 1. 策划 Agent 生成大纲 2. 写作 Agent 撰写各章节,配图 Agent 生成配图建议(两者只依赖大纲,并行执行) 3. 审校 Agent 审查正文质量 4. 整合输出最终文章 """
def __init__(self) -> None: self._tasks: dict[str, ContentTask] = {} self._agent: Any | None = None
def build_agent(self) -> Any: """构建主编 Agent。
Returns: LangChain Agent 对象 """ if self._agent is not None: return self._agent
self._agent = create_agent( model=build_chat_model(), tools=get_all_tools(), system_prompt=EDITOR_SYSTEM_PROMPT, ) return self._agent
def create_task(self, topic: str, task_id: str | None = None) -> ContentTask: """创建内容创作任务。
Args: topic: 文章主题 task_id: 可选任务 ID
Returns: 任务对象 """ import uuid
if task_id is None: task_id = f"content_{uuid.uuid4().hex[:8]}"
task = ContentTask( task_id=task_id, topic=topic, status=TaskStatus.PENDING, ) self._tasks[task_id] = task logger.info("创建内容创作任务: %s - %s", task_id, topic) return task
async def run_planner_step(self, task: ContentTask) -> bool: """运行策划步骤。
Args: task: 任务对象
Returns: 是否成功 """ try: task.status = TaskStatus.RUNNING logger.info("[%s] 步骤 1/3: 生成大纲", task.task_id)
outline_text = plan_content.invoke({"topic": task.topic})
if "失败" in outline_text or "⚠️" in outline_text: logger.error("大纲生成失败: %s", outline_text) return False
logger.info("大纲生成成功,长度: %d", len(outline_text)) return True except Exception as e: logger.error("策划步骤异常: %s", e) return False
async def run_writer_step(self, task: ContentTask, section_titles: list[str]) -> bool: """运行写作步骤。
Args: task: 任务对象 section_titles: 章节标题列表
Returns: 是否成功 """ try: logger.info("[%s] 步骤 2/3: 撰写章节", task.task_id)
from .tools import write_section
for i, title in enumerate(section_titles, 1): logger.info(" 撰写第 %d/%d 章: %s", i, len(section_titles), title) content = write_section.invoke({"section_title": title}) if "失败" in content: logger.warning("章节「%s」撰写失败", title) continue task.add_section(title, content, order=i)
logger.info("章节撰写完成,共 %d 章,总字数: %d", len(task.sections), task.get_total_words()) return len(task.sections) > 0 except Exception as e: logger.error("写作步骤异常: %s", e) return False
async def run_reviewer_step(self, task: ContentTask) -> bool: """运行审校步骤。
Args: task: 任务对象
Returns: 是否成功 """ try: logger.info("[%s] 步骤 3/3: 内容审校", task.task_id)
from .tools import review_content
feedback = await asyncio.to_thread(review_content.invoke, {}) if "尚无" in feedback or "失败" in feedback: logger.warning("审校未完成: %s", feedback) return False
logger.info("审校完成") return True except Exception as e: logger.error("审校步骤异常: %s", e) return False
async def run_designer_step(self, task: ContentTask) -> bool: """运行配图设计步骤。
Args: task: 任务对象
Returns: 是否成功 """ try: logger.info("[%s] 步骤 2/3: 配图设计(与写作并行)", task.task_id)
from .tools import suggest_images
images = await asyncio.to_thread(suggest_images.invoke, {}) if "先生成" in images or "失败" in images: logger.warning("配图设计未完成: %s", images) return False
logger.info("配图设计完成") return True except Exception as e: logger.error("配图步骤异常: %s", e) return False
def assemble_final_article(self, task: ContentTask) -> str: """整合最终文章。
Args: task: 任务对象
Returns: 完整文章 Markdown """ memory_summary = read_memory.invoke({"key": ""})
outline = shared_memory.get("outline") review = shared_memory.get("review_feedback", "") images = shared_memory.get("images", "")
sections_text = "\n\n".join([ f"## {s.title}\n\n{s.content}" for s in task.sections ])
final_article = f"""# {task.topic}
---
## 大纲 {outline}
---
## 正文
{sections_text}
---
## 审校反馈 {review or '(暂无)'}
---
## 配图建议 {images or '(暂无)'} """ task.final_article = final_article return final_article
async def run_full_workflow(self, topic: str) -> str: """运行完整创作工作流。
Args: topic: 文章主题
Returns: 最终文章 """ task = self.create_task(topic)
shared_memory.clear()
success = await self.run_planner_step(task) if not success: return "❌ 大纲生成失败,请重试"
agent = self.build_agent() await asyncio.gather( self._agent_write_sections(agent, task), self.run_designer_step(task), )
await self.run_reviewer_step(task)
task.status = TaskStatus.COMPLETED return self.assemble_final_article(task)
async def _agent_write_sections(self, agent: Any, task: ContentTask) -> None: """让 Agent 自主决定写作哪些章节。
Args: agent: Agent 对象 task: 任务对象 """ prompt = ( f"文章主题:{task.topic}\n" "大纲已生成,请根据大纲撰写 2-3 个核心章节。" "写完每个章节后检查是否还需要继续写。" ) try: result = await asyncio.to_thread(agent.run, prompt) logger.info("Agent 自主写作完成") except Exception as e: logger.error("Agent 自主写作异常: %s", e)
_graph: ContentCreationGraph | None = None
def get_graph() -> ContentCreationGraph: """获取内容创作工作流图实例。
Returns: ContentCreationGraph 单例 """ global _graph if _graph is None: _graph = ContentCreationGraph() return _graph
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| """项目七:项目定义层。
定义多智能体内容创作平台的项目注册和对外接口。 """ from __future__ import annotations
from typing import Any
from core import BaseProject, registry
from .graph import get_graph from .tools import get_all_tools
class ContentCreationProject(BaseProject): """多智能体内容创作平台。
主讲能力:Multi-Agent 编排 + 共享记忆系统
业务场景:用户给一个主题,多个 Agent 分工协作完成一篇高质量文章: 策划 Agent →(写作 Agent ‖ 配图 Agent)→ 审校 Agent
生产级特性: - 主管-子智能体编排(Orchestrator Pattern) - 共享记忆池(各 Agent 输出存入共享上下文,下个 Agent 可见) - 子 Agent 独立 System Prompt(角色专精,互不干扰) - 并行执行(写作和配图用 asyncio.gather 同时进行) - 结构化输出(最终文章为 Markdown 格式) """
id = "p07_content_creation" name = "多智能体内容创作平台" description = "策划+写作+审校+配图,四 Agent 分工协作,共享记忆。" capabilities = ["Multi-Agent", "Shared Memory", "Content Creation"]
def build_agent(self) -> Any: """构建主编 Agent 实例。
Returns: LangChain Agent 对象 """ graph = get_graph() return graph.build_agent()
def run(self, message: str) -> str: """运行内容创作任务。
Args: message: 用户输入的文章主题或创作需求
Returns: 创作结果 """ import asyncio
graph = get_graph()
try: loop = asyncio.get_event_loop_policy().get_event_loop() if loop.is_running(): agent = self.build_agent() return agent.run(message) else: return asyncio.run(graph.run_full_workflow(message)) except Exception as e: agent = self.build_agent() return agent.run(message)
project = ContentCreationProject()
run = project.run build_agent = project.build_agent get_tools = get_all_tools
__all__ = [ "ContentCreationProject", "project", "run", "build_agent", "get_tools", ]
|
init.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| """项目七:多智能体内容创作平台(Multi-Agent + 共享记忆)
主讲能力:Multi-Agent 编排 + 共享记忆系统
业务场景:用户给一个主题,多个 Agent 分工协作完成一篇高质量文章: 策划 Agent →(写作 Agent ‖ 配图 Agent)→ 审校 Agent
生产级特性: - 主管-子智能体编排(Orchestrator Pattern) - 共享记忆池(各 Agent 输出存入共享上下文,下个 Agent 可见) - 子 Agent 独立 System Prompt(角色专精,互不干扰) - 并行执行(写作和配图用 asyncio.gather 同时进行) - 结构化输出(最终文章为 Markdown 格式)
架构分层: - models.py: 数据模型、共享记忆池 - prompts.py: 各角色系统提示词 - tools.py: 各 Agent 工具函数 - graph.py: Multi-Agent 工作流编排 - project.py: 项目定义、对外接口 """ from __future__ import annotations
from core import registry
from .models import ( AgentRole, ArticleSection, ContentOutline, ContentTask, ImageSuggestion, ReviewFeedback, SharedMemory, shared_memory, TaskStatus, )
from .prompts import ( ASSEMBLE_PROMPT, DESIGNER_PROMPT, EDITOR_SYSTEM_PROMPT, PLANNER_PROMPT, REVIEWER_PROMPT, WRITER_PROMPT, )
from .tools import ( clear_memory, get_all_tools, plan_content, read_memory, review_content, suggest_images, write_section, )
from .graph import ContentCreationGraph, get_graph
from .project import ( ContentCreationProject, build_agent, get_tools, project, run, )
SYSTEM_PROMPT = EDITOR_SYSTEM_PROMPT
registry.register(project)
__all__ = [ "AgentRole", "TaskStatus", "ContentOutline", "ArticleSection", "ImageSuggestion", "ReviewFeedback", "ContentTask", "SharedMemory", "shared_memory", "EDITOR_SYSTEM_PROMPT", "SYSTEM_PROMPT", "PLANNER_PROMPT", "WRITER_PROMPT", "REVIEWER_PROMPT", "DESIGNER_PROMPT", "ASSEMBLE_PROMPT", "plan_content", "write_section", "review_content", "suggest_images", "read_memory", "clear_memory", "get_all_tools", "ContentCreationGraph", "get_graph", "ContentCreationProject", "project", "run", "build_agent", "get_tools", ]
|
9.4.2 核心代码讲解
1. 共享记忆池(SharedMemory)——编辑部的共享文档
models.py 里的 SharedMemory 是整个平台的状态枢纽,相当于编辑部的共享文档。每个 Agent 干完活,用 shared_memory.put(key, content) 把成果丢进去;下一个 Agent 用 shared_memory.get(key) 取自己要的。比如撰稿人读策划师写入的 outline,审稿人遍历所有 section:* 键把正文汇总起来。记忆池还记着写入顺序(_order),并提供 summary() 摘要与 clear() 重置。这样一来,多个 Agent 谁也不用直接喊谁,只对着“共享文档”读写,就完成了解耦的状态交接。
💡 顿悟时刻:状态共享不一定要靠复杂的消息传递。一个 key-value 的“共享文档”,往往就是最稳妥的协作方式——简单、可观测、谁都能查。
2. 主管-子智能体编排(Orchestrator Pattern)——主编不亲自下场
graph.py 的 ContentCreationGraph 扮演“主编/主管”,调度四个角色工具:策划(plan_content)、写作(write_section)、审校(review_content)、配图(suggest_images)。主编自己不写一个字,只负责拆活、派活,在 run_full_workflow 里把各步骤编排起来,最后由 assemble_final_article 拼版。这就是 Orchestrator 模式的精髓:主管管编排与决策,角色工具管执行,职责清晰,将来想加个“排版 Agent”也只是多挂一个工具的事。
3. 各角色独立 Prompt——一角色一剧本
prompts.py 给每个角色单独写了一份系统提示词:PLANNER_PROMPT 管结构规划、WRITER_PROMPT 管成稿、REVIEWER_PROMPT 管质量审查、DESIGNER_PROMPT 管配图方案,EDITOR_SYSTEM_PROMPT 则约束主编的调度顺序。每份 Prompt 只讲本角色要干嘛、输出长啥样,互不干扰。这种“一角色一剧本”的隔离,让模型上下文更聚焦、输出更稳,也方便单独调优某个角色——比如想换文风,只动 WRITER_PROMPT 就行,别的角色纹丝不碰。
4. Graph 层的异步工作流编排——写作与配图真并行
graph.py 把每个步骤实现成 async 方法(run_planner_step / run_writer_step / run_reviewer_step / run_designer_step),由 run_full_workflow 编排。build_agent 通过 create_agent 构建主编 Agent,工具来自 get_all_tools()。步骤之间用 ContentTask 对象传递状态,每步还打日志标进度(如“步骤 2/3”),整条流水线可观测、可追踪。
💡 关键在并行:策划完大纲后,写作和配图只依赖大纲、彼此无依赖,于是用 asyncio.gather 把 _agent_write_sections 和 run_designer_step 同时发起——这是真并行,不是“串行 await”。两个分支写各自的共享记忆键(写作写 section:*,配图写 images),SharedMemory 是普通 dict,写不同键在 GIL 下并发安全。唯有审校要读正文,所以排在并行环节之后。同步的 agent.run 和工具 invoke 都用 asyncio.to_thread 扔进线程池,避免阻塞事件循环、保证两个分支真能同时跑。
5. 错误降级处理——事件循环探测与兜底
project.py 的 run 方法得同时伺候两种场景:脚本里直跑、已经嵌在异步服务里跑。它先用 asyncio.get_event_loop_policy().get_event_loop() 探一下当前有没有正在转的事件循环:若有(loop.is_running()),说明身在异步服务中,不能再 asyncio.run(否则报“事件循环已运行”),于是直接交给主编 Agent 处理;没有的话,才用 asyncio.run(graph.run_full_workflow(message)) 跑完整工作流。万一上述流程抛异常,还会兜底降级成“直接用 Agent 运行”。“事件循环检测 + 异常兜底”这套组合拳,保证项目在不同环境都不至于因事件循环冲突而崩。
6. 三层架构的解耦优势——从一坨代码到一支队伍
模型层(models.py)定数据结构与共享记忆,提示词层(prompts.py)封装角色指令,工具层(tools.py)实现具体能力,编排层(graph.py)组织流程,项目层(project.py)与入口(__init__.py)管注册与对外暴露。各层单向依赖、边界清晰:改某个角色的 Prompt 不碰工具实现,新增一个 Agent 只需加工具和编排步骤,数据模型一行不用动。这套分层,把多智能体系统从“一坨代码”拆成了可独立演进、可单元测试的模块化结构——和把一个人拆成一支编辑部,是同一个道理。
9.5 跑一跑:它真的行吗
光跑通不算完,得测。这支“编辑部”上线前,至少要测这几样:
- 大纲工具返回结构化章节
- 配图工具返回多条建议
- 汇总节点能合并状态
- 集成测试生成完整 Markdown
把每个角色的产出和最终拼版分别验证,才能保证流水线哪一环都不掉链子。
9.6 送上线:让它上班
该项目在统一管理台中表现为一个“主题输入 → 文档输出”的工作流。前端的执行过程面板会显示 planner、writer、reviewer、illustrator、assemble 等节点流转——用户看到的,就是一支编辑部在流水线上接力交棒。
9.7 回头看:学到了什么
回头看,Multi-Agent 的价值从来不是“把一个问题拆成多个机器人显得高级”,而是用角色隔离降低复杂度。当任务天然包含多个专业角色时——比如内容创作里的策划、写作、审校、配图——多智能体比一个全能 Agent 更稳、更好维护。每个 Agent 只背自己那口锅,上下文干净,输出才干净。
金句:一个人演完整个编辑部,每个角色都演不到位;让每个 Agent 只演一个角色,整台戏才出彩。
说明一下并行的真实情况,免得你误读:写作和配图这两步,graph.py 确实用 asyncio.gather 同时发起,是真并行而非串行 await;审校依赖正文,才排在并行环节之后。本章真正还没做的,是把它升级成 LangGraph 的 StateGraph 原生 fan-out——目前是手写 SharedMemory + asyncio.gather 的轻量编排。把“已经并行”和“还能用更标准的方式并行”分清楚,是工程人的本分。
下一章,我们把这种协作模式搬到一个更严肃的场景——研究分析。
如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !