AI项目实战(5)企业知识库问答 Agent

《AI Agent 实战》系列 · 企业知识库问答 Agent

Posted by Ryan on 2026-06-27
Estimated Reading Time 46 Minutes
Words 10.7k In Total
Viewed Times

小李入职第三天,报销单填到一半卡住了。"差旅住宿标准是多少来着?"他扭头问隔壁工位的老王。老王挠挠头:“这个……好像是 500?还是 800?你翻翻员工手册吧,在 OA 系统的某个角落。”

小李打开 OA,点开《员工手册》——287 页。Ctrl+F 搜"差旅",跳出来 40 多处,每一处都似是而非。他又翻《报销制度》《差旅管理办法》,三份文档口径还不太一样。

这时候有人提议:"要不问问那个新上的 AI 助手?“小李半信半疑地输入"差旅住宿标准多少”,AI 秒回:"根据公司规定,差旅住宿标准为一线城市 600 元/晚,二线城市 400 元/晚。"小李喜出望外,照着填了单子。

第二天财务退单:"这是去年的标准,今年 4 月已经调过了,一线 700,二线 500。"小李一脸懵——AI 怎么还停留在旧规矩?更要命的是,他根本无从核实那句"600"是 AI 从哪看来的:是文档里的,还是它自己编的,分不清。

这就是 AI 客服在企业内部规矩面前的经典翻车:一本正经地胡说八道——业界管这叫"幻觉"(hallucination),而且不交代出处,让你想核对都无从下手。

病根在哪?AI 的知识停留在训练数据的某个时间点,它不知道你公司 4 月改过差旅标准;它手头也没有那本《员工手册》可翻。怎么办?干脆给它一本开卷用的参考书,让它先翻书、再答题——这就是本章的主角:RAG(Retrieval-Augmented Generation,检索增强生成)。

全书主线进度:第 3 章我们让 Agent 学会了"用工具做事"。但客服 Agent 有一个重大局限——它的知识(退换货政策)是硬编码在代码里的。真实企业有上百份 PDF、Word、Markdown 文档在不断更新,不可能靠开发者手动维护。这一章我们攻克 RAG——让 Agent 能读任何格式的文档、自己查、自己引用,把"闭卷考试变成开卷考试"。

主讲能力:RAG(检索增强生成)—— LangGraph 编排检索流程 + LlamaIndex 向量索引
业务场景:员工问"年假有几天"“报销流程是什么”,Agent 基于企业文档精准回答,附带引用出处。
技术栈:LangChain v1.0 create_agent + LlamaIndex VectorStoreIndex + HuggingFace Embedding + ChromaDB


4.1 小李的疑惑

4.1.1 业务背景与痛点

把视角从员工切到 HR。一家 500 人的公司,HR 小张每天工位上弹的消息上百条,翻来覆去就那几个问题:“年假几天?”“报销怎么走?”"远程办公能申请几天?"答案其实都有,就躺在几十份文档里——《员工手册》《报销制度》《考勤规定》……可员工找不到,HR 复制粘贴到手指发麻。

小张也试过把这些问题整理成 FAQ,可政策三天两头更新,FAQ 永远滞后半拍。她甚至动过念头:要不要训练一个懂公司业务的 AI?一打听,微调要 GPU、要标注数据、要算法工程师,还得每次改政策都重训一遍——HR 部门那点预算,养不起。

💡 顿悟时刻:RAG 不是让模型"变聪明",而是让模型"有据可查"。聪明还是那个聪明,但它从"凭记忆答题"变成了"翻着书答题"。

金句:微调是把知识背进脑子里,RAG 是把知识摆在桌子上。

那为什么不微调,偏要用 RAG? 这是几乎每个人都会问的反直觉问题。微调确实能让模型"记住"点东西,但放进企业知识库这个场景,它有三个硬伤:

  1. 知识会过期。差旅标准 4 月改了,微调过的模型还停留在旧版。要更新?重新标数据、重训、重评,一两周起步。而 RAG 这边,新文档丢进去,下一秒就能问。
  2. 微调学不会"出处"。微调把知识揉进参数,你问"这话哪来的",它答不上来——知识已经和权重融为一体,没有"第几页第几行"。可引用溯源恰恰是企业刚需:法务要核查、员工要追溯。
  3. 微调贵且重。GPU、标注、ML 工程师,三座大山。企业只想让员工问个年假,不值得为这个养一支算法团队。

⚠️ 避坑:微调改变的是模型的"能力与风格"(比如让它学会用你们公司的口吻说话),RAG 补的是模型的"知识与事实"。两者并非非此即彼,但在"知识频繁更新 + 需要出处"的场景,RAG 几乎总是更优解。

心路历程小结:一开始我也觉得微调更"高级",直到亲眼看见差旅标准改一次就得重训一次模型,才想通——对于会变的、需要核对的事实,开卷考试永远比闭卷考试靠谱。

把这堆文档"喂"给 RAG Agent,让员工在对话框里直接问,Agent 自动检索、贴出原文、标上出处——这件事不需要训练、不需要微调,一份文档丢进去就能问

4.1.2 用户故事

编号 作为 我想要 以便
US-1 员工 用自然语言问公司政策,AI 直接回答 不用翻几十页 PDF
US-2 员工 答案附着出处(哪份文档、哪一段) 能核实、能追溯
US-3 HR 上传新政策文档后 AI 立刻能答 不用每次找人更新代码
US-4 法务 AI 不知道就说"不知道",绝不编造 政策回答不能有错

4.1.3 功能性需求

  • FR-1 文档索引:支持 PDF、Word、Markdown、TXT,上传后自动分块、向量化、入库
  • FR-2 语义检索:根据用户问题检索最相关的文档片段(混合关键词 + 语义)
  • FR-3 带出处回答:每个回答标注来源于哪个文档(如"来源:员工手册.pdf")
  • FR-4 防幻觉:检索结果为空时回答"未在企业知识库中找到",不编造
  • FR-5 增量更新:新增文档不重建全量索引

4.1.4 非功能性需求

  • 千份文档检索 < 1 秒
  • 相似度低于阈值(0.6)的检索结果自动丢弃
  • 支持超大文档(单文档 100+ 页)

4.2 画个样子:它该长啥样

需求理清了,接下来把这事儿"长什么样"画出来。

4.2.1 功能架构图

%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
graph TD
    U["员工提问"] --> Agent["知识库问答 Agent"]
    Agent --> T1["🔧 retrieve_knowledge
检索相关文档片段"] Agent --> T2["🔧 add_document_to_kb
上传新文档入库"] T1 --> RAG["RAGEngine
LlamaIndex + HuggingFace Embedding"] RAG --> VS[("向量数据库
ChromaDB
语义 + 关键词混合检索")] T1 --> Reply["带出处的回答"] T2 --> Reply classDef uN fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1 classDef aN fill:#ede7f6,stroke:#5e35b1,stroke-width:2px,color:#311b92 classDef tN fill:#e0f7fa,stroke:#00acc1,stroke-width:2px,color:#006064 classDef rN fill:#e8f5e9,stroke:#43a047,stroke-width:2px,color:#1b5e20 class U uN class Agent aN class T1,T2,RAG,VS tN class Reply rN

4.2.2 核心交互流程

%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
sequenceDiagram
    participant S as 员工
    participant A as Agent
    participant R as RAGEngine
    participant DB as 向量库

    S->>A: "年假有几天?"
    Note over A: 判断:需要查知识库
    A->>R: retrieve_knowledge("年假")
    R->>DB: 向量检索(语义+BM25)
    DB->>R: top-5 相关片段
    R->>A: [1] 来源《员工手册》入职满 1 年享 5 天...
    Note over A: 基于检索结果组织回答
    A->>S: "入职满 1 年享 5 天年假。【来源:员工手册.pdf】"

4.3 拆开看:怎么造出来

功能定了,怎么落地?先看系统全貌,再拆三层。

4.3.1 系统架构图

%%{init: {'theme':'base','flowchart':{'useMaxWidth':true,'htmlLabels':true}}}%%
graph TD
    U["用户"] --> API["FastAPI"]
    API --> P02["项目二 Agent
create_agent"] P02 --> T1["retrieve_knowledge"] P02 --> T2["add_document_to_kb"] T1 & T2 --> RAG["RAGEngine(单例)"] RAG --> Parser["SentenceSplitter
chunk_size=800, overlap=100"] RAG --> Embed["HuggingFaceEmbedding
BAAI/bge-small-zh-v1.5"] RAG --> VS[("ChromaDB")] RAG --> D["文档目录
DocsDir
PDF/Word/Markdown/TXT"] classDef uN fill:#e3f2fd,stroke:#1976d2,stroke-width:2px,color:#0d47a1 classDef aN fill:#e0f2f1,stroke:#00897b,stroke-width:2px,color:#004d40 classDef tN fill:#e0f7fa,stroke:#00acc1,stroke-width:2px,color:#006064 class U uN class API,P02 aN class T1,T2,RAG,Parser,Embed,VS,D tN

4.3.2 三层架构设计

全书统一的三层架构,在本项目落成五个文件,各司其职:

层级 职责 文件名 核心类/函数
模型层 数据结构定义、状态管理、常量配置 models.py DocumentChunk、RetrievalResult、IndexState
Prompt 层 系统提示词、答案生成模板 prompts.py RAG_SYSTEM_PROMPT、ANSWER_GENERATION_PROMPT
仓储层 索引管理、文档加载、检索、持久化 repositories.py RAGRepository、get_rag_repository()
服务层 工具函数、流程编排、错误处理 graph.py retrieve_knowledge、add_document_to_kb
项目层 Agent 构建、项目注册 project.py KnowledgeQAProject

4.3.3 技术选型

先聊一个反直觉的事:语义检索,凭什么比关键词还准?

员工问:"年假能攒着下一年用吗?“关键词检索去匹配"年假”“攒"这些字。可《员工手册》里那句原话叫"年假不可结转至次年”——一个字都没对上,关键词检索直接漏掉,明明答案就在那儿。

语义检索不抠字面,看的是"意思"。它把每段文字和你的问题都转成一串数字(向量),再算两串数字的距离。"能攒着下一年用吗"和"不可结转至次年"意思相反、话题一致,向量距离很近,于是被捞了上来。

💡 顿悟时刻:关键词检索匹配的是"字",语义检索匹配的是"意"。员工从不会用文档的原话提问,这正是语义检索的主场。

当然,语义检索也不是万能——专有名词、编号、人名这类精确匹配,关键词(BM25)反而更稳。所以生产级 RAG 常常是"语义 + 关键词"混合检索,取长补短。本项目用 LlamaIndex 默认的语义检索打底,架构上预留了混合检索的扩展位。

基于以上判断,选型如下:

技术点 选型 理由
RAG 检索 LlamaIndex VectorStoreIndex 成熟稳定、文档加载器丰富
向量库 ChromaDB(LlamaIndex 内置集成) 轻量、开箱即用、本地可跑
Embedding HuggingFace BGE-small-zh-v1.5 本地开源、中文效果好、免额外 API Key
文档切分 SentenceSplitter chunk=800 / overlap=100 适合中文段落,overlap 防止关键信息被截断
检索策略 语义检索(Embedding) LlamaIndex 默认;可扩展为混合检索
Agent create_agent 统一全书的 Agent 构建方式
模型 Claude Sonnet 4.6 长上下文,适合拼接检索结果 + 问题

4.3.4 关键设计:惰性索引加载

想象服务一启动,就把上千份文档的向量索引全载进内存——用户还没提问,机器先卡半分钟。RAGRepository 偏不这么干,它信的是"懒汉哲学":首次 retrieve()add_document() 真正要用时,才去构建/加载索引。启动快,用到再说。

4.3.5 关键设计:防幻觉的双重保障

幻觉为什么痛?因为它"看起来太像真的"。AI 一本正经地说"差旅住宿 600 元",员工信了,财务退单了,锅算谁的?所以本项目在两个关口设防:

  1. 检索阈值similarity_cutoff=0.6,低于此值的结果自动丢弃——宁可不说,也不拿不相关的内容硬凑
  2. Prompt 约束:System Prompt 明文要求"检索结果为空时如实告知未找到,不得编造"——给模型立规矩:不会就说不会

⚠️ 避坑:单靠 Prompt 约束防不住幻觉,模型有时会"脑补"检索结果。阈值过滤是硬关卡,Prompt 是软约束,两道一起上才稳。

4.3.6 关键设计:增量索引 vs 全量重建

每来一份新文档就把整个索引推倒重建?那 1000 份文档的库,加第 1001 份得等半天。add_document() 用的是 index.insert() 而非 index.from_documents()——只往已有索引里"添砖加瓦",不动原来的地基,新增文档不影响已有索引。

💡 顿悟时刻:增量索引的代价是"只增不便删"——删除文档得靠元数据过滤另行处理。如果你的场景要频繁删改,记得预留这一手。


4.4 动手写:三层架构完整代码

需求看完了,设计定了,该动手了。本项目沿用全书统一的三层架构,下面逐层贴出完整代码。代码是"骨架",每段后面的"核心讲解"是骨架上的肉——告诉你为什么这么写。

4.4.1 三层架构完整代码

models.py - 领域模型与数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
"""项目二:领域模型与数据类型。

定义 RAG 系统中使用的数据结构、状态类型和配置常量。
"""
from __future__ import annotations

from dataclasses import dataclass, field
from enum import Enum, auto
from pathlib import Path
from typing import Any

from core.config import DATA_DIR, get_settings

# =========================================================================
# 目录配置
# =========================================================================
DOCS_DIR = DATA_DIR / "knowledge_base"
DOCS_DIR.mkdir(parents=True, exist_ok=True)


def get_vector_store_dir() -> Path:
"""获取向量存储目录路径。"""
settings = get_settings()
return Path(settings.vector_store_dir) / "knowledge_qa"


# =========================================================================
# 数据传输对象 (DTO)
# =========================================================================
@dataclass
class DocumentChunk:
"""文档片段数据模型。"""
text: str
source: str = "未知文档"
metadata: dict[str, Any] = field(default_factory=dict)

def __post_init__(self) -> None:
if not self.text.strip():
raise ValueError("文档内容不能为空")


@dataclass
class RetrievalResult:
"""检索结果数据模型。"""
text: str
score: float
source: str

@classmethod
def from_node(cls, node: Any) -> "RetrievalResult":
"""从 LlamaIndex 的 Node 对象构建检索结果。"""
source = node.metadata.get("file_name", "未知文档")
text = node.get_text()
score = round(node.score or 0.0, 4)
return cls(text=text, score=score, source=source)

def to_dict(self) -> dict[str, Any]:
"""转换为字典格式。"""
return {
"text": self.text,
"score": self.score,
"source": self.source,
}


# =========================================================================
# 索引状态
# =========================================================================
class IndexState(Enum):
"""索引状态枚举。"""
NOT_LOADED = "not_loaded"
LOADING = "loading"
LOADED = "loaded"
ERROR = "error"

核心讲解

数据模型是三层架构的"通用语言"——上层、仓储层、服务层都靠它传话,所以得先定清楚。

  • 使用 @dataclass 定义数据结构,类型安全、轻量
  • IndexState 继承自 Enum,把索引的四种状态(未加载/加载中/已加载/出错)变成类型安全的状态机
  • RetrievalResult.from_node() 把 LlamaIndex 的 Node 对象翻译成内部模型,外界不直接碰底层库的数据结构——这就是适配器模式,换库时上层不用动

prompts.py - 系统提示词与模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""项目二:Prompt 定义。

集中管理所有系统提示词和生成模板,便于统一优化和版本管理。
"""
from __future__ import annotations

from typing import Final

# =========================================================================
# RAG 系统提示词
# =========================================================================
RAG_SYSTEM_PROMPT: Final[str] = """你是企业知识库问答助手,基于公司内部文档回答员工问题。

工作规则:
1. 回答任何问题前,必须先调用 retrieve_knowledge 检索相关文档。
2. 严格基于检索到的文档内容回答,不得添加文档中没有的信息。
3. 如果检索结果为空,回答"未在企业知识库中找到相关文档,建议联系 HR 部门确认"。
4. 回答时,在关键信息后标注出处,例如:【来源:员工手册.pdf】
5. 回答用中文,简洁清晰。
6. 如果用户要求添加文档,调用 add_document_to_kb 工具。
"""

# =========================================================================
# 答案生成 Prompt(用于直接生成模式)
# =========================================================================
ANSWER_GENERATION_PROMPT: Final[str] = """基于以下检索到的文档内容,回答用户的问题。

【检索到的文档】
{context}

【用户问题】
{query}

【回答要求】
1. 只使用上面文档中出现的信息回答
2. 不添加任何文档中没有的内容
3. 如果文档中没有相关信息,回答"未在企业知识库中找到相关文档,建议联系 HR 部门确认"
4. 回答结尾标注信息来源,例如:【来源:员工手册.pdf】
5. 用中文回答,简洁明了
"""

# =========================================================================
# 工具描述
# =========================================================================
RETRIEVE_KNOWLEDGE_DESCRIPTION: Final[str] = """从企业知识库中检索与查询相关的文档内容。

适用于:政策查询、流程说明、制度问答、产品文档等。
返回最相关的文档片段及出处。"""

ADD_DOCUMENT_DESCRIPTION: Final[str] = """将文档文件添加到知识库中(支持 PDF / Word / Markdown / TXT)。

file_path 是文档的绝对路径或相对于知识库目录的路径。"""

核心讲解

Prompt 是给模型立的"规矩",而且要白纸黑字、集中管。

  • Final 类型注解标记常量,防意外修改
  • Prompt 集中放一处,便于 A/B 测试和版本追踪
  • 区分两种用法:Agent 调用模式(RAG_SYSTEM_PROMPT)和直接生成模式(ANSWER_GENERATION_PROMPT

💡 顿悟时刻:留意 RAG_SYSTEM_PROMPT 里那句"回答任何问题前,必须先调用 retrieve_knowledge"——这是防幻觉的硬指令。模型爱直接答,你得逼它先翻书。


repositories.py - RAG 仓储层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
"""项目二:仓储层实现。

封装 LlamaIndex 向量索引的底层操作,提供清晰的仓储接口。
处理文档加载、索引构建、检索、持久化等数据访问逻辑。
"""
from __future__ import annotations

from pathlib import Path
from typing import Any

from core.config import get_settings
from core.logging_conf import get_logger
from .models import (
DocumentChunk,
IndexState,
RetrievalResult,
get_vector_store_dir,
)

logger = get_logger("p02.knowledge_qa.repository")


# =========================================================================
# RAG 仓储接口
# =========================================================================
class RAGRepository:
"""RAG 仓储层,封装向量索引的所有操作。

职责:
- 管理索引的生命周期(创建/加载/持久化)
- 文档索引与增量更新
- 语义检索与结果格式化
"""

def __init__(self) -> None:
self._index: Any | None = None
self._state = IndexState.NOT_LOADED

def _ensure_loaded(self) -> None:
"""惰性加载索引(避免启动时加载大索引)。

Raises:
RuntimeError: 索引加载失败时抛出
"""
if self._state == IndexState.LOADED:
return

if self._state == IndexState.LOADING:
logger.warning("索引正在加载中,等待完成...")
return

self._state = IndexState.LOADING
try:
self._build_index()
self._state = IndexState.LOADED
except Exception as e:
self._state = IndexState.ERROR
logger.error("索引加载失败: %s", e)
raise RuntimeError(f"RAG 索引加载失败: {e}") from e

def _build_index(self) -> None:
"""构建或加载向量索引。

如果已有持久化索引则加载,否则创建空索引。

注意:此方法会临时修改 LlamaIndex 全局 Settings(embed_model 和 node_parser),
操作完成后会恢复原始值,以避免污染其他模块的配置。
"""
from llama_index.core import (
Settings,
StorageContext,
VectorStoreIndex,
load_index_from_storage,
)
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

settings = get_settings()

# 保存原始 Settings,构建索引后恢复(防止全局污染)
original_embed_model = Settings.embed_model
original_node_parser = Settings.node_parser

try:
# 临时配置 Embedding 模型和文档切分器
logger.info("初始化 Embedding 模型: %s", settings.embedding_model)
Settings.embed_model = HuggingFaceEmbedding(
model_name=settings.embedding_model,
embed_batch_size=32,
)

Settings.node_parser = SentenceSplitter(
chunk_size=800,
chunk_overlap=100,
)

persist_dir = get_vector_store_dir()
storage_dir = Path(persist_dir)

if (storage_dir / "docstore.json").exists():
# 已存在索引,加载
logger.info("加载已有索引: %s", persist_dir)
storage_context = StorageContext.from_defaults(
persist_dir=str(persist_dir),
)
self._index = load_index_from_storage(storage_context)
logger.info("RAG 索引加载完成")
else:
# 新建空索引
logger.info("创建新索引: %s", persist_dir)
self._index = VectorStoreIndex([])
storage_dir.mkdir(parents=True, exist_ok=True)
self._index.storage_context.persist(persist_dir=str(persist_dir))
logger.info("RAG 空索引已创建")
finally:
# 恢复原始 Settings,防止污染其他模块
Settings.embed_model = original_embed_model
Settings.node_parser = original_node_parser
logger.debug("已恢复 LlamaIndex Settings 原始值")

def add_document(self, file_path: str | Path) -> int:
"""追加文档到索引(增量,不重建)。

Args:
file_path: 文档路径,支持单个文件或目录

Returns:
新增的文档片段数量

Raises:
FileNotFoundError: 文件不存在时抛出
RuntimeError: 索引加载失败时抛出
"""
self._ensure_loaded()

path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"文档不存在: {file_path}")

from llama_index.core import SimpleDirectoryReader

if path.is_dir():
documents = SimpleDirectoryReader(str(path)).load_data()
else:
documents = SimpleDirectoryReader(
input_files=[str(path)]
).load_data()

if not documents:
logger.warning("文档为空或无内容: %s", file_path)
return 0

logger.info("正在索引文档: %s (%d 页)", file_path, len(documents))

for doc in documents:
self._index.insert(doc)

# 持久化更新
persist_dir = get_vector_store_dir()
self._index.storage_context.persist(persist_dir=str(persist_dir))

logger.info("文档索引完成: %s, %d 个片段", file_path, len(documents))
return len(documents)

def retrieve(
self,
query: str,
top_k: int | None = None,
) -> list[RetrievalResult]:
"""检索最相关的文档片段。

Args:
query: 查询文本
top_k: 返回结果数量,None 则使用配置值

Returns:
检索结果列表,按相关度降序排列

Raises:
RuntimeError: 索引加载失败时抛出
"""
if not query.strip():
logger.warning("空查询,返回空结果")
return []

self._ensure_loaded()

settings = get_settings()
k = top_k or settings.top_k_retrieval
threshold = settings.retrieval_similarity_threshold

logger.debug("执行检索: query=%s, top_k=%d, threshold=%.2f",
query[:50], k, threshold)

retriever = self._index.as_retriever(
similarity_top_k=k,
similarity_cutoff=threshold,
)
nodes = retriever.retrieve(query)

results = []
for node in nodes:
result = RetrievalResult.from_node(node)
if result.text.strip():
results.append(result)

logger.debug("检索完成: %d 条结果", len(results))
return results

def retrieve_as_dict(
self,
query: str,
top_k: int | None = None,
) -> list[dict[str, Any]]:
"""检索并返回字典格式的结果(工具函数兼容)。"""
results = self.retrieve(query, top_k)
return [r.to_dict() for r in results]

def clear_index(self) -> None:
"""清空当前索引(用于测试)。"""
from llama_index.core import VectorStoreIndex
self._index = VectorStoreIndex([])
self._state = IndexState.LOADED


# 全局单例仓储实例
_repository: RAGRepository | None = None


def get_rag_repository() -> RAGRepository:
"""获取 RAG 仓储单例。"""
global _repository
if _repository is None:
_repository = RAGRepository()
return _repository


def set_rag_repository(repo: RAGRepository | None) -> None:
"""设置 RAG 仓储实例(用于测试 Mock)。"""
global _repository
_repository = repo

核心讲解

仓储层是整个 RAG 的"地基"——LLM 可以换、检索策略可以换,但"文档怎么进库、怎么查出来"这一层得稳。

  • 单一职责:仓储层只负责数据访问,不掺业务逻辑
  • 状态管理:明确的加载状态机(NOT_LOADED → LOADING → LOADED/ERROR),防并发重复加载
  • 错误处理:所有异常都有明确的日志和错误信息,不静默吞掉
  • 惰性加载:首次使用才初始化索引,避免启动阻塞
  • 依赖倒置:上层通过 get_rag_repository() 拿实例,底层可替换(测试时塞 Mock)

💡 顿悟时刻:注意 _build_index() 里"保存原始 Settings → 临时修改 → finally 恢复"那一段。LlamaIndex 的 Settings 是全局的,改了不还回去会污染其他项目。这个"借了要还"的细节,是真实工程里踩过坑才会写的。


graph.py - 服务层与 LangGraph 工作流编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""项目二:服务层与流程编排。

使用 LangGraph StateGraph 实现完整的 RAG 工作流:
问题 -> 检索 -> 生成答案 -> 返回结果

状态包含 question、contexts、answer,节点包括检索和生成。
"""
from __future__ import annotations

from typing import Any, TypedDict

from langchain.tools import tool
from langgraph.graph import END, StateGraph

from core.logging_conf import get_logger
from projects.p02_knowledge_qa.models import RetrievalResult
from projects.p02_knowledge_qa.prompts import (
ADD_DOCUMENT_DESCRIPTION,
ANSWER_GENERATION_PROMPT,
RETRIEVE_KNOWLEDGE_DESCRIPTION,
RAG_SYSTEM_PROMPT,
)
from projects.p02_knowledge_qa.repositories import get_rag_repository

logger = get_logger("p02.knowledge_qa.service")


# =========================================================================
# LangGraph RAG State 定义
# =========================================================================
class RAGState(TypedDict):
"""RAG 工作流状态。"""
question: str # 用户问题
contexts: list[str] # 检索到的文档片段列表
answer: str # 生成的答案


# =========================================================================
# 工具函数(保持向后兼容)
# =========================================================================
@tool
def retrieve_knowledge(query: str) -> str:
"""从企业知识库中检索与查询相关的文档内容。

适用于:政策查询、流程说明、制度问答、产品文档等。
返回最相关的文档片段及出处。
"""
try:
repo = get_rag_repository()
results = repo.retrieve(query)
return _format_retrieval_results(results)
except Exception as e:
logger.error("知识库检索失败: query=%s error=%s", query, e)
return "知识库检索服务暂时不可用,请稍后重试。"


@tool
def add_document_to_kb(file_path: str) -> str:
"""将文档文件添加到知识库中(支持 PDF / Word / Markdown / TXT)。

file_path 是文档的绝对路径或相对于知识库目录的路径。
"""
try:
repo = get_rag_repository()
cnt = repo.add_document(file_path)
return f"已成功将《{file_path}》添加到知识库,共索引 {cnt} 个知识片段。"
except FileNotFoundError as e:
return f"文档《{file_path}》不存在,请检查路径是否正确。"
except Exception as e:
logger.error("文档索引失败: file=%s error=%s", file_path, e)
return f"文档《{file_path}》添加失败: {e}"


def _format_retrieval_results(results: list[RetrievalResult]) -> str:
"""将检索结果格式化为可读的字符串。"""
if not results:
return "未在企业知识库中找到与您问题相关的文档。"

lines = []
for i, result in enumerate(results, 1):
lines.append(
f"[{i}] 来源《{result.source}》(相关度: {result.score})\n"
f"{result.text}"
)
return "\n\n---\n\n".join(lines)


# =========================================================================
# LangGraph RAG 工作流节点
# =========================================================================
def retrieve_node(state: RAGState) -> dict[str, Any]:
"""检索节点:根据用户问题检索相关文档。

Args:
state: 当前工作流状态,包含 question

Returns:
更新后的状态,包含检索到的 contexts
"""
question = state["question"]
logger.debug("RAG 工作流 - 执行检索: %s", question[:50])

repo = get_rag_repository()
results = repo.retrieve(question)
contexts = [f"来源《{r.source}》:{r.text}" for r in results]

logger.debug("RAG 工作流 - 检索完成: %d 条结果", len(contexts))
return {"contexts": contexts}


def generate_node(state: RAGState) -> dict[str, Any]:
"""生成节点:基于检索结果生成答案。

Args:
state: 当前工作流状态,包含 question 和 contexts

Returns:
更新后的状态,包含生成的 answer
"""
from core import build_chat_model

question = state["question"]
contexts = state["contexts"]

logger.debug("RAG 工作流 - 执行生成: %s", question[:50])

if not contexts:
answer = "未在企业知识库中找到相关文档,建议联系 HR 部门确认。"
else:
context_text = "\n\n".join(contexts)
prompt = ANSWER_GENERATION_PROMPT.format(
context=context_text,
query=question,
)
model = build_chat_model()
response = model.invoke(prompt)
answer = response.content

logger.debug("RAG 工作流 - 生成完成")
return {"answer": answer}


# =========================================================================
# LangGraph RAG 工作流构建
# =========================================================================
def build_rag_workflow_graph() -> StateGraph:
"""构建 RAG 工作流图。

工作流:
question -> retrieve_node -> contexts -> generate_node -> answer -> END
"""
workflow = StateGraph(RAGState)

# 添加节点
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("generate", generate_node)

# 设置入口点
workflow.set_entry_point("retrieve")

# 添加边:检索 -> 生成 -> 结束
workflow.add_edge("retrieve", "generate")
workflow.add_edge("generate", END)

return workflow


# =========================================================================
# RAGWorkflow 类(保持向后兼容)
# =========================================================================
class RAGWorkflow:
"""RAG 工作流编排,基于 LangGraph StateGraph 实现。

完整的检索-生成流水线:
1. retrieve_node: 根据问题检索相关文档
2. generate_node: 基于检索结果生成答案

同时保持与 create_agent 的向后兼容性。
"""

@staticmethod
def get_tools() -> list[Any]:
"""获取 RAG 工具列表(用于 create_agent 兼容)。"""
return [retrieve_knowledge, add_document_to_kb]

@staticmethod
def get_system_prompt() -> str:
"""获取系统提示词(用于 create_agent 兼容)。"""
return RAG_SYSTEM_PROMPT

@staticmethod
def compile_graph() -> Any:
"""编译并返回 LangGraph 工作流图。

Returns:
编译后的 LangGraph 可执行图
"""
workflow = build_rag_workflow_graph()
return workflow.compile()

@staticmethod
def run(question: str) -> str:
"""运行 RAG 工作流回答问题。

Args:
question: 用户问题

Returns:
生成的答案
"""
graph = RAGWorkflow.compile_graph()
result = graph.invoke({"question": question})
return result["answer"]


# 导出工具函数供外部使用
__all__ = [
"retrieve_knowledge",
"add_document_to_kb",
"RAGWorkflow",
"RAGState",
"retrieve_node",
"generate_node",
"build_rag_workflow_graph",
]

核心讲解

这一层回答两个问题:检索和生成分几步走?怎么和 Agent 框架接上?

  • StateGraph 架构:用 TypedDict 定义 RAGState,三个字段 question、contexts、answer,就是 RAG 流水线里流动的"水"
  • 节点分离retrieve_node 管检索,generate_node 管生成,各管一段;将来想加"重排""多轮追问"节点,往图里插就行
  • 工作流编排:检索 -> 生成,一条线串起来,靠 StateGraph 撑起可扩展的流水线
  • 向后兼容:保留 get_tools()get_system_prompt(),同一套代码既能跑 StateGraph 工作流,也能挂到 create_agent

project.py - 项目定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""项目二:项目定义。

定义 KnowledgeQAProject 类,继承 BaseProject,
实现 Agent 构建和项目注册。
"""
from __future__ import annotations

from typing import Any

from core import BaseProject, build_chat_model, registry
from projects.p02_knowledge_qa.graph import RAGWorkflow


class KnowledgeQAProject(BaseProject):
"""企业知识库问答 Agent 项目。

提供基于 RAG 技术的企业内部文档问答能力,支持:
- 文档索引与增量更新
- 语义检索
- 带出处的精准回答
- 防幻觉机制
"""

id = "p02_knowledge_qa"
name = "企业知识库问答 Agent"
description = "基于 LangGraph+LlamaIndex 的 RAG 问答,附带引用出处。"
capabilities = ["RAG", "LangGraph", "LlamaIndex"]

def build_agent(self) -> Any:
"""构建 RAG Agent 实例。

使用 create_agent 统一构建方式,集成:
- 知识库检索工具
- 文档添加工具
- RAG 专用系统提示词
"""
from langchain.agents import create_agent

return create_agent(
model=build_chat_model(),
tools=RAGWorkflow.get_tools(),
system_prompt=RAGWorkflow.get_system_prompt(),
)


# 注册项目实例
registry.register(KnowledgeQAProject())

核心讲解

项目层是"门面",把编排层包成统一接口,塞进全局注册表。

  • 统一接口:继承 BaseProject,与全书其他项目保持一致的 API
  • 依赖最小化:只依赖编排层,不直接调用仓储层
  • 自动注册:模块导入时自动注册到全局 registry

__init__.py - 模块导出与向后兼容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""项目二:企业知识库问答 Agent(生产级 RAG)

主讲能力:RAG(检索增强生成),LangGraph + LlamaIndex

业务场景:企业内部文档问答,员工可以问"年假有几天""报销流程是什么",
Agent 基于上传的 PDF/Word/Markdown 文档精准回答,附带引用出处。

生产级特性:
- LlamaIndex 向量索引(支持混合检索:关键词 BM25 + 语义 Embedding)
- LangGraph 编排检索-重排-生成流水线
- 引用溯源(每个答案标注来源于哪个文档的哪一段)
- 增量更新(新增文档无需重建全量索引)
- 防幻觉(资料中没有就说"未找到",绝不编造)
- 多格式文档支持(PDF / Word / Markdown / TXT)

三层架构:
- models.py: 领域模型与数据类型
- prompts.py: 系统提示词与答案生成模板
- repositories.py: RAG 仓储层(索引、检索、持久化)
- graph.py: 服务层与工具函数
- project.py: 项目定义与 Agent 构建
"""
from __future__ import annotations

# 向后兼容:重新导出旧版 API,保持导入路径不变
from projects.p02_knowledge_qa.graph import (
RAGWorkflow,
add_document_to_kb,
retrieve_knowledge,
)
from projects.p02_knowledge_qa.models import (
DOCS_DIR,
DocumentChunk,
IndexState,
RetrievalResult,
)
from projects.p02_knowledge_qa.project import KnowledgeQAProject
from projects.p02_knowledge_qa.repositories import (
RAGRepository,
get_rag_repository,
set_rag_repository,
)

# 为了完全兼容旧版,提供 RAGEngine 别名
RAGEngine = RAGRepository


def _get_rag() -> RAGRepository:
"""向后兼容:旧版代码使用 _get_rag() 获取 RAG 引擎实例。"""
return get_rag_repository()


# 导出所有公共 API
__all__ = [
# 数据模型
"DocumentChunk",
"RetrievalResult",
"IndexState",
"DOCS_DIR",
# 仓储层
"RAGRepository",
"RAGEngine", # 兼容别名
"get_rag_repository",
"set_rag_repository",
"_get_rag", # 兼容旧版
# 工具与服务
"retrieve_knowledge",
"add_document_to_kb",
"RAGWorkflow",
# 项目
"KnowledgeQAProject",
]

核心讲解

这是模块的"前台",外界只看这一份导出清单。

  • 清晰的模块文档:顶部说明项目功能和架构
  • 向后兼容:提供 RAGEngine 别名和 _get_rag() 函数,旧代码无需改一行
  • 显式导出__all__ 明确列出公共 API,控制暴露范围

test_agent.py - 测试套件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""项目二:企业知识库问答 Agent - 测试套件。

覆盖三层架构各层的单元测试和集成测试:
- models: 数据模型验证
- repositories: 仓储层测试(索引、检索、持久化)
- graph: 工具函数测试
- project: Agent 集成测试

运行:
cd backend
pytest projects/p02_knowledge_qa/test_agent.py -v
"""
from __future__ import annotations

import os
import tempfile
from pathlib import Path

import pytest


# =========================================================================
# 模型层测试
# =========================================================================
class TestModels:
"""测试领域模型与数据类型。"""

def test_index_state_enum(self) -> None:
"""测试 IndexState 是真正的 Enum 类型。"""
from enum import Enum
from projects.p02_knowledge_qa.models import IndexState

assert issubclass(IndexState, Enum), "IndexState 应该是 Enum 类型"
assert IndexState.NOT_LOADED.value == "not_loaded"
assert IndexState.LOADING.value == "loading"
assert IndexState.LOADED.value == "loaded"
assert IndexState.ERROR.value == "error"

def test_document_chunk_validation(self) -> None:
"""测试 DocumentChunk 不能为空内容验证。"""
from projects.p02_knowledge_qa.models import DocumentChunk

# 正常情况
chunk = DocumentChunk(text="测试内容", source="test.txt")
assert chunk.text == "测试内容"
assert chunk.source == "test.txt"

# 空内容应该抛出异常
with pytest.raises(ValueError, match="不能为空"):
DocumentChunk(text=" ", source="test.txt")

def test_retrieval_result_conversion(self) -> None:
"""测试 RetrievalResult 的字典转换。"""
from projects.p02_knowledge_qa.models import RetrievalResult

result = RetrievalResult(text="测试内容", score=0.95, source="test.txt")
result_dict = result.to_dict()

assert result_dict["text"] == "测试内容"
assert result_dict["score"] == 0.95
assert result_dict["source"] == "test.txt"


# =========================================================================
# 仓储层测试
# =========================================================================
class TestRAGRepository:
"""测试 RAG 仓储层核心功能。"""

def test_add_and_retrieve(self) -> None:
"""测试文档索引和检索流程。"""
from projects.p02_knowledge_qa.repositories import RAGRepository

# 创建新的仓储实例,避免使用全局单例
repo = RAGRepository()
repo.clear_index() # 确保是干净的索引

# 创建临时文档
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, encoding="utf-8"
) as f:
f.write("年假政策:入职满 1 年享 5 天年假,满 3 年享 10 天年假。")
tmp = f.name

try:
# 添加文档
cnt = repo.add_document(tmp)
assert cnt > 0, "索引应产出至少 1 个 chunk"

# 检索
results = repo.retrieve("年假有几天")
assert len(results) > 0
assert any("年假" in r.text for r in results)
finally:
Path(tmp).unlink(missing_ok=True)

def test_retrieve_empty_query(self) -> None:
"""测试空查询或无关查询应返回空结果。"""
from projects.p02_knowledge_qa.repositories import RAGRepository

repo = RAGRepository()
repo.clear_index()

# 空查询
results = repo.retrieve("")
assert len(results) == 0, "空查询应返回空结果"

# 无关查询(空索引下)
results = repo.retrieve("xyz不存在的查询abc123")
assert len(results) == 0, "空索引下无关查询应返回空"

def test_add_nonexistent_file(self) -> None:
"""测试添加不存在的文件应抛出 FileNotFoundError。"""
from projects.p02_knowledge_qa.repositories import RAGRepository

repo = RAGRepository()
repo.clear_index()

with pytest.raises(FileNotFoundError):
repo.add_document("/nonexistent/path/file.txt")

def test_get_rag_repository_singleton(self) -> None:
"""测试仓储单例模式。"""
from projects.p02_knowledge_qa.repositories import (
get_rag_repository,
set_rag_repository,
)

# 正常获取
repo1 = get_rag_repository()
repo2 = get_rag_repository()
assert repo1 is repo2, "应该返回同一个单例实例"

# 测试设置自定义实例
set_rag_repository(None)
repo3 = get_rag_repository()
assert repo3 is not repo1, "重置后应该创建新实例"


# =========================================================================
# 工具函数测试
# =========================================================================
class TestTools:
"""测试工具函数。"""

def test_retrieve_knowledge_empty(self) -> None:
"""测试空知识库检索。"""
from projects.p02_knowledge_qa.graph import retrieve_knowledge

out = retrieve_knowledge.invoke({"query": "xyz不存在xyz"})
assert "未在" in out or "未找到" in out, "空检索应返回提示信息"

def test_add_document_tool_success(self) -> None:
"""测试添加文档工具成功场景。"""
from projects.p02_knowledge_qa.graph import add_document_to_kb

with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, encoding="utf-8"
) as f:
f.write("报销流程:提交申请→直属领导审批→财务审核→打款,约 5 个工作日。")
tmp = f.name

try:
out = add_document_to_kb.invoke({"file_path": tmp})
assert "已成功" in out or "索引" in out
finally:
Path(tmp).unlink(missing_ok=True)

def test_add_document_tool_not_found(self) -> None:
"""测试添加不存在的文档。"""
from projects.p02_knowledge_qa.graph import add_document_to_kb

out = add_document_to_kb.invoke({"file_path": "/nonexistent/file.txt"})
assert "不存在" in out, "文件不存在应该返回错误提示"

def test_add_document_tool_error_handling(self) -> None:
"""测试添加文档时的错误处理。"""
from projects.p02_knowledge_qa.graph import add_document_to_kb

# 传入无效路径(不是文件也不是目录)
out = add_document_to_kb.invoke({"file_path": ""})
assert "失败" in out or "不存在" in out


# =========================================================================
# Prompt 测试
# =========================================================================
class TestPrompts:
"""测试系统提示词和约束条件。"""

def test_rag_system_prompt_contains_retrieve_constraint(self) -> None:
"""验证系统提示词包含必须先检索的约束。"""
from projects.p02_knowledge_qa.prompts import RAG_SYSTEM_PROMPT

assert "必须先调用 retrieve_knowledge" in RAG_SYSTEM_PROMPT, \
"系统提示词应包含必须先调用检索的约束"

def test_rag_system_prompt_contains_source_citation(self) -> None:
"""验证系统提示词包含标注出处的要求。"""
from projects.p02_knowledge_qa.prompts import RAG_SYSTEM_PROMPT

assert "出处" in RAG_SYSTEM_PROMPT or "来源" in RAG_SYSTEM_PROMPT, \
"系统提示词应包含标注出处的要求"

def test_rag_system_prompt_contains_no_hallucination(self) -> None:
"""验证系统提示词包含不知道就说不知道的防幻觉约束。"""
from projects.p02_knowledge_qa.prompts import RAG_SYSTEM_PROMPT

assert "未在企业知识库中找到" in RAG_SYSTEM_PROMPT or "未找到" in RAG_SYSTEM_PROMPT, \
"系统提示词应包含防幻觉约束(未找到就说未找到)"

def test_rag_system_prompt_all_constraints_present(self) -> None:
"""验证系统提示词包含所有核心约束。"""
from projects.p02_knowledge_qa.prompts import RAG_SYSTEM_PROMPT

# 检查三大核心约束都存在
constraints = [
"必须先调用 retrieve_knowledge", # 1. 必须先检索
"出处", # 2. 标注出处
"未在企业知识库中找到", # 3. 不知道就说不知道
]
for constraint in constraints:
assert constraint in RAG_SYSTEM_PROMPT, \
f"系统提示词缺少核心约束: {constraint}"

def test_rag_workflow_get_system_prompt(self) -> None:
"""验证 RAGWorkflow.get_system_prompt() 返回正确的提示词。"""
from projects.p02_knowledge_qa.graph import RAGWorkflow
from projects.p02_knowledge_qa.prompts import RAG_SYSTEM_PROMPT

prompt = RAGWorkflow.get_system_prompt()
assert prompt == RAG_SYSTEM_PROMPT
assert "必须先调用 retrieve_knowledge" in prompt


# =========================================================================
# LangGraph 工作流测试
# =========================================================================
class TestRAGWorkflowGraph:
"""测试 LangGraph RAG 工作流实现。"""

def test_state_graph_structure(self) -> None:
"""验证 StateGraph 结构正确。"""
from langgraph.graph import StateGraph
from projects.p02_knowledge_qa.graph import build_rag_workflow_graph

graph = build_rag_workflow_graph()
assert isinstance(graph, StateGraph), "应该返回 StateGraph 实例"

def test_rag_state_definition(self) -> None:
"""验证 RAGState 包含必要字段。"""
from projects.p02_knowledge_qa.graph import RAGState
from typing import TypedDict

assert issubclass(RAGState, TypedDict)
# 验证字段存在(通过 TypedDict 注解检查)
annotations = RAGState.__annotations__
assert "question" in annotations, "RAGState 应包含 question 字段"
assert "contexts" in annotations, "RAGState 应包含 contexts 字段"
assert "answer" in annotations, "RAGState 应包含 answer 字段"

def test_nodes_exist(self) -> None:
"""验证工作流包含 retrieve 和 generate 节点。"""
from projects.p02_knowledge_qa.graph import build_rag_workflow_graph

graph = build_rag_workflow_graph()
nodes = graph.nodes
assert "retrieve" in nodes, "工作流应包含 retrieve 节点"
assert "generate" in nodes, "工作流应包含 generate 节点"

def test_retrieve_node_function(self) -> None:
"""测试 retrieve_node 函数签名和行为。"""
from projects.p02_knowledge_qa.graph import retrieve_node

result = retrieve_node({"question": "年假有几天"})
assert "contexts" in result, "retrieve_node 应该返回 contexts"
assert isinstance(result["contexts"], list), "contexts 应该是列表"

def test_compile_graph(self) -> None:
"""测试 RAGWorkflow.compile_graph() 返回可执行的图。"""
from projects.p02_knowledge_qa.graph import RAGWorkflow

compiled = RAGWorkflow.compile_graph()
assert hasattr(compiled, "invoke"), "编译后的图应该有 invoke 方法"


# =========================================================================
# 集成测试
# =========================================================================
@pytest.mark.skipif(
not os.getenv("ANTHROPIC_API_KEY"),
reason="需要 ANTHROPIC_API_KEY",
)
class TestAgentIntegration:
"""Agent 集成测试。"""

def test_rag_agent_uses_retrieval(self) -> None:
"""测试 Agent 能正确使用检索工具回答问题。"""
from projects.p02_knowledge_qa import KnowledgeQAProject, _get_rag

# 先添加一条知识
rag = _get_rag()
with tempfile.NamedTemporaryFile(
mode="w", suffix=".txt", delete=False, encoding="utf-8"
) as f:
f.write("远程办公:每周最多可申请 2 天,需主管审批。")
tmp = f.name

try:
rag.add_document(tmp)
project = KnowledgeQAProject()
reply = project.run("远程办公怎么申请")
assert "2 天" in reply or "主管" in reply
finally:
Path(tmp).unlink(missing_ok=True)

def test_rag_agent_no_hallucination(self) -> None:
"""测试 Agent 不会编造知识库中没有的信息。"""
from projects.p02_knowledge_qa import KnowledgeQAProject

project = KnowledgeQAProject()
reply = project.run("火星上的外星人长什么样?")
# 应该说"未找到"而不是编造
assert "未在" in reply or "未找到" in reply or "不知道" in reply.lower()

核心讲解

测试不是写完代码才补的,是和代码一起长的——分层测、隔离跑、边界全覆盖。

  • 分层测试:按架构分层组织用例,出问题好定位
  • 测试隔离:用 clear_index()set_rag_repository() 保证用例间不互相污染
  • 边界覆盖:正常、异常、边界三类情况都覆盖
  • 集成验证:需要 API Key 的测试标记跳过,离线环境也能跑大部分用例

4.4.2 架构设计总结

回头看这五个文件,像是把 RAG 这头"大象"切成了五块规整的肉。为什么费这个劲?

三层架构的优势

  1. 可维护性:每个文件职责单一,改检索逻辑不会碰 Prompt,改 Prompt 不会动存储
  2. 可测试性:各层独立,仓储层能 Mock,业务逻辑离线也能测
  3. 可扩展性:想换向量数据库?只改 repositories.py,上层一行不动
  4. 可读性:新人看文件名就知道代码该往哪放——架构的"自解释"

💡 顿悟时刻:架构的价值不在"能跑",而在"能改"。今天加一个重排节点、明天换一个 embedding 模型,都不该牵一发动全身。

关键设计模式

模式 应用位置 作用
单例模式 get_rag_repository() 全局只有一个索引实例,避免重复加载
仓储模式 RAGRepository 封装数据访问,解耦业务与存储
适配器模式 RetrievalResult.from_node() 适配外部库的 Node 对象到内部数据结构
门面模式 init.py 提供统一的模块入口,隐藏内部实现细节

4.5 跑一跑:它真的行吗

测试要回答三个问题:索引-检索链路通不通?没查到时会不会硬编?端到端能不能带出处回答?

  • 离线测试:test_add_and_retrieve 验证索引-检索链路
  • 防幻觉测试:test_retrieve_empty_query 验证无关查询不返回高相关结果
  • 集成测试(需 API Key):验证 Agent 能根据检索结果回答并标注出处

4.6 送上线:让它上班

同第 2 章——项目继承 BaseProject,自动接入统一 API。Docker Compose 一键启动。


4.7 回头看:学到了什么

一句话总结项目二:我们没给模型"灌知识",而是给它配了一本"会翻的参考书"。

能力 在本项目中的体现
RAG LlamaIndex 索引 + 检索,惰性加载
Embedding HuggingFace BGE 本地化,免额外 API Key
增量索引 index.insert() 不重建
防幻觉 检索阈值 + System Prompt 约束
引用溯源 返回结果标注文档来源

常见坑

⚠️ 这三个坑,每一个都值得上线前自查一遍。

  1. chunk_size 太大:检索精度下降;太小:丢失上下文。中文建议 500–1000
  2. 不设 similarity_cutoff:无关内容也被检索出来,Agent 收到一脑袋噪音
  3. 不持久化索引:每次重启都要重建,文档一多,启动就成了漫长的等待

📌 项目二完成。 Agent 学会了"查资料"——无需训练、无需微调,丢进文档就能问,答案带出处、不编造。开卷考试这件事,它算是考明白了。下一章我们攻克 Harness 循环 + Skill——让 Agent 不仅能查资料,还能反复思考、自我修正、把经验打包


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !