对LangFlow源码进行二次开发

对LangFlow源码进行二次开发AgentServer

Posted by Ryan on 2025-12-08
Estimated Reading Time 36 Minutes
Words 7.4k In Total
Viewed Times

这是一份深度技术分析报告与系统设计文档。我们将首先深入剖析 Langflow 的底层源码机制,特别是其如何处理多 Agent 和 Workflow;随后,基于这些分析,为您设计一套生产级的 AgentServer 扩展方案。


第一部分:Langflow 源码深度解析

Langflow (v1.0+) 的本质是一个 可视化的 LangChain/LangGraph 编译器。它不直接运行图形,而是将图形(JSON Payload)转换为 Python 对象图(Dependency Graph),然后按拓扑顺序执行。

1. 核心架构与工作流原理

1.1 从 JSON 到 Python 对象 (The Graph Engine)

核心代码位于 src/backend/base/langflow/graph/ 目录下。

  1. Graph (图): graph/base.py 中的 Graph 类是核心。它接收前端传来的 JSON (data 字段),包含 nodes (节点) 和 edges (边)。
  2. Vertex (顶点): vertex/base.py 中的 Vertex 类代表一个节点。每个 Vertex 包装了一个 CustomComponent (自定义组件) 或基础 LangChain 组件。
  3. 构建过程 (Build Process):
    • 拓扑排序: Graph 会根据 edges 计算执行顺序(Topological Sort)。
    • 参数注入: 上游节点的输出(Output)会作为下游节点的输入(Input)。这是通过 edge 定义的 source_handletarget_handle 映射完成的。
    • 实例化: 每个 Vertex 调用 build() 方法。
      • 如果节点是 CustomComponent,它会执行用户定义的 build 方法(例如返回一个 ChainAgent)。
      • 如果节点是基础组件,它会利用反射机制实例化 LangChain 类。

1.2 多 Agent 与 Workflow 的底层实现

Langflow 支持多 Agent 主要有两种模式,底层调用的 API 不同:

模式 A:基于 LangChain 的 AgentExecutor (传统模式)

  • 原理: 使用 langchain.agents.AgentExecutor
  • 流程: 画布上的 “Tools” 节点被实例化为 Tool 列表 -> 注入到 “Agent” 节点 (如 OpenAI Tools Agent) -> Agent 节点被注入到 “AgentExecutor” 节点。
  • 运行: 这是一个黑盒循环(While Loop),由 LangChain 内部控制 ReAct 逻辑。

模式 B:基于 LangGraph 的多 Agent (现代模式)

这是 Langflow 近期更新的重点,也是实现复杂 Workflow 的关键。

  • API 使用: 核心依赖 langgraph.graph.StateGraphlanggraph.prebuilt.
  • 源码痕迹: 在 src/backend/base/langflow/components/ 下的某些高级组件中,或者用户自定义组件中。
  • 运行原理:
    1. 节点定义: 每个 Agent(如 “Researcher”, “Writer”)被构建为一个 Runnable (通常是 ChatModel 绑定了 Tools)。
    2. State 定义: 组件内部定义 TypedDict 作为状态。
    3. 图构建: 使用 StateGraph 添加节点 (add_node) 和边 (add_edge, add_conditional_edges)。
    4. 编译: 调用 graph.compile() 生成 CompiledGraph
    5. Langflow 的处理: Langflow 的引擎将这个 CompiledGraph 视为一个普通的 Runnable。当执行到该节点时,调用其 .invoke().stream() 方法。

1.3 关键 API 总结

  • LangChain Core: Runnable (LCEL 协议), BaseChatModel, Tool.
  • LangGraph: StateGraph, END, START, CompiledGraph.
  • Langfuse: Langflow 内部通过 langflow.services.monitor 模块集成了 Langfuse,主要通过给 Runnable 注入 callbacks=[LangfuseCallbackHandler()] 来实现。

第二部分:AgentServer 扩展设计文档

基于上述分析,Langflow 原生是一个“构建与调试工具”,直接用于生产面临着性能(每次请求重新构建图)、版本管理缺失、限流缺失等问题。

本设计旨在构建一个独立于 Langflow 原生运行逻辑,但依附于其项目结构AgentServer 模块。

1. 总体架构设计

我们将采用 模块化单体 (Modular Monolith) 架构。AgentServer 作为 Langflow 后端的一个独立子模块(Python Package),共享 Langflow 的数据库连接和基础工具类,但拥有独立的 API 路由、缓存机制和执行逻辑。

graph TD
    subgraph "Frontend "
        UI_Flow[Flow Editor] -->|Publish| API_Mgmt
        UI_Mgmt[Agent Management Page] -->|Version/Rollback| API_Mgmt
    end

    subgraph "External Consumers"
        Client[3rd Party App] -->|Invoke API| API_Public
    end

    subgraph "Agent Server (Backend Extension)"
        API_Mgmt[Management API]
        API_Public[Public Runtime API]
        
        Middleware[Rate Limiter ]
        
        subgraph "Core Logic"
            DeployMgr[Deployment Manager]
            CacheMgr[Graph Object Cache]
            Runner[Execution Engine]
        end
        
        Tracer[Langfuse Injector]
    end

    subgraph "Storage"
        MySQL[(MySQL: Deployments & Versions)]
        Redis[(Redis: Rate Limits & Cache)]
    end

    API_Public --> Middleware
    Middleware --> Runner
    Runner -->|Check Cache| CacheMgr
    CacheMgr -->|Load Snapshot| MySQL
    Runner -->|Trace| Tracer
    DeployMgr -->|Save Snapshot| MySQL

2. 数据库设计 (MySQL)

我们需要使用 SQLModel (Langflow 的 ORM) 定义两张新表,用于版本控制。

2.1 deployment 表 (部署主表)

记录对外发布的 Service 信息。

字段名 类型 说明
id UUID 主键
flow_id UUID 关联原始 Flow ID
endpoint_slug String 对外访问的 URL 后缀 (唯一索引)
name String 部署名称
active_version Integer 当前激活的版本号
rate_limit String 限流规则 (如 “60/m”)
api_key_enabled Boolean 是否开启 API Key 验证
created_at DateTime 创建时间

2.2 deployment_version 表 (版本快照表)

记录每次发布时的 Flow 静态快照(JSON)。

字段名 类型 说明
id UUID 主键
deployment_id UUID 外键关联 deployment
version Integer 版本号 (1, 2, 3…)
graph_snapshot JSON 核心:发布时的 Flow JSON 全量数据
config JSON 生产环境覆盖配置 (Tweaks)
created_at DateTime 发布时间

3. 详细模块设计

3.1 模块结构

src/backend/base/langflow/ 下新建 agent_server 目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
langflow/agent_server/
├── __init__.py
├── api/ # API 路由
│ ├── management.py # 后管接口 (发布/版本)
│ └── runtime.py # 对外运行接口
├── services/ # 业务逻辑
│ ├── deployment.py # 部署逻辑
│ └── execution.py # 核心执行引擎
├── models/ # 数据库模型
│ └── deployment.py
└── utils/ # 限流与追踪工具
├── rate_limiter.py
└── tracing.py

3.2 核心流程逻辑

A. 发布流程 (Management API)

  1. 用户在前端点击 “Publish”。
  2. 后端接收 flow_id 和配置。
  3. flow 表读取当前的 data (JSON)。
  4. deployment 表创建或更新记录。
  5. deployment_version 表插入一条新记录,version 自增,并将 data 存入 graph_snapshot
  6. 关键点:Snapshot 机制保证了后续修改原 Flow 不会影响线上运行版本。

B. 运行流程 (Runtime API) - 带缓存与限流

  1. 接收请求: POST /agent-server/run/{endpoint_slug}
  2. 限流检查: 使用 fastapi-limiter + Redis,基于 IP 或 API Key 进行计数。
  3. 缓存查找:
    • 检查内存中(LRU Cache)是否已有构建好的 Runnable 对象。
    • Key 为 deployment_id + version
  4. 构建 (Cache Miss):
    • 如果缓存未命中,从 MySQL 读取 graph_snapshot
    • 调用 Graph.from_payload(snapshot)
    • 调用 graph.build() 实例化 Python 对象。
    • 存入缓存。
  5. Langfuse 注入:
    • 创建 CallbackHandler
    • 利用 Runnable.with_config(callbacks=[handler]) 动态注入。
  6. 执行:
    • 支持 stream=True 参数。
    • 如果流式:使用 astream_events (v2 API) 捕获 Token 和 Tool 事件,封装为 SSE (Server-Sent Events) 返回。
    • 如果非流式:使用 ainvoke 等待结果返回 JSON。

3.3 前端设计 (Extension)

为了不干扰原项目,我们将新建组件文件夹,但需要在原项目的路由配置中注册。

  • 技术栈: React + TailwindCSS (保持一致)。
  • 入口: 在 Flow 列表页的每个卡片上增加 “Deployments” 按钮。
  • 管理弹窗/页面:
    • 展示历史版本列表 (Table)。
    • 操作列:回滚 (Rollback)、删除、查看配置。
    • 状态指示:当前激活版本高亮。
    • 限流配置输入框。

4. 关键技术难点解决方案

4.1 Token 限制与流式响应

  • 问题: 大模型输出长文本容易超时。
  • 解决: 强制要求 API 支持 SSE。AgentServer 将利用 LangChain 的 astream_events 接口,不仅流式传输文本,还将 “Tool Use” (工具调用) 状态流式传输给前端,实现类似 ChatGPT 的 “Thinking…” 效果。

4.2 生产环境稳定性

  • 问题: 每次请求都解析 JSON 构建图非常消耗 CPU。
  • 解决: 实现 Graph Object Cache。发布后的图是静态的,构建一次后应在内存中复用 Runnable 对象。只有当版本切换或服务重启时才重新构建。

4.3 数据库隔离

  • 问题: 避免 AgentServer 的表与 Langflow 原生表冲突。
  • 解决: 使用 Alembic 进行独立的 Migration 管理,或者在表名上增加前缀 as_ (Agent Server)。

待确认事项:
请确认以上 架构设计数据库 Schema 是否符合您的预期?
如果确认,我将开始执行代码开发,首先输出数据库模型和核心服务代码。


这是一个非常完善的迭代需求。在上一版架构的基础上,我们将重点增强前端交互体验系统稳定性保障(测试)以及国际化支持

以下是针对 AgentServer v2.0 的详细设计文档,涵盖了前端、测试、限流管理和国际化方案。


AgentServer v2.0 详细设计文档

1. 前端页面交互与视觉设计 (Frontend Design)

为了保持与 Langflow 原生风格一致,我们将复用其基于 TailwindCSS 和 Shadcn UI (或类似 Radix primitives) 的组件库。

1.1 页面结构规划

我们将不干扰原有的 Flow 编辑画布,而是扩展 “My Collection” (Flow 列表页) 和新增 “Deployment Center” (部署中心)

A. Flow 列表页增强

  • 操作栏扩展: 在每个 Flow 卡片的下拉菜单中,除了原有的 “Edit”, “Delete” 外,增加 “🚀 Deployments” (部署管理) 按钮。
  • 状态指示: 卡片上增加小圆点指示器。
    • 🟢 Online: 有正在运行的部署版本。
    • Offline: 未部署。

B. 部署管理弹窗/抽屉 (Deployment Drawer)

点击 “Deployments” 后,从右侧滑出一个抽屉(Drawer),包含以下 Tab:

  1. Overview (概览):

    • Endpoint Info: 显示对外调用的 URL (如 https://api.domain.com/agent/run/my-flow-v1).
    • CURL Example: 一键复制调用代码示例 (Python/JS/CURL)。
    • Status Toggle: 一个 Switch 开关,用于一键 停用/启用 该接口。
    • Rate Limit Config: 这里的限流管理由只读变为可编辑
      • 输入框: Rate Limit (例如: 60/m, 10/s).
      • 按钮: Update Config.
  2. Version History (版本历史):

    • Timeline View: 垂直时间轴展示发布记录。
    • Item Content: 版本号 (v1, v2)、发布时间、备注信息。
    • Actions:
      • Rollback: 回滚到此版本(将此版本的 snapshot 覆盖为当前 active)。
      • View Config: 查看当时的配置快照。
  3. Playground (生产环境测试):

    • 不同于编辑器的调试,这是直接调用 AgentServer 接口的对话框。
    • 用于验证限流、Trace 注入是否生效。

1.2 国际化 (i18n) 交互设计

  • 语言切换器: 在 Langflow 页面右上角(User Profile 旁)增加一个地球图标 🌐。
  • 下拉菜单:
    • 🇺🇸 English
    • 🇨🇳 简体中文
  • 持久化: 语言偏好保存到 LocalStorage,下次访问自动加载。

2. 国际化 (i18n) 技术方案

Langflow 原生代码主要为英文且硬编码。为了实现汉化,我们需要引入 i18next 并在 React 组件中进行侵入式替换。

2.1 技术栈

  • 核心库: i18next, react-i18next
  • 检测器: i18next-browser-languagedetector

2.2 实现步骤

Step 1: 目录结构

在前端 src 目录下新建 locales

1
2
3
4
5
6
7
8
9
src/locales/
├── en/
│ ├── common.json # 通用词汇 (Save, Cancel, Delete)
│ ├── flow.json # Flow 相关
│ └── agent_server.json # 新增模块的词汇
└── zh/
├── common.json
├── flow.json
└── agent_server.json

示例 zh/agent_server.json:

1
2
3
4
5
6
7
8
{
"deploy_btn": "部署",
"rollback_success": "版本回退成功",
"rate_limit_label": "接口限流规则",
"rate_limit_placeholder": "例如: 60/m (每分钟60次)",
"status_active": "运行中",
"status_inactive": "已停止"
}

Step 2: 初始化配置 (src/i18n.js)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import i18n from 'i18next';
import { initReactI18next } from 'react-i18next';
import LanguageDetector from 'i18next-browser-languagedetector';
import enCommon from './locales/en/common.json';
import zhCommon from './locales/zh/common.json';
// ... import others

i18n
.use(LanguageDetector)
.use(initReactI18next)
.init({
resources: {
en: { common: enCommon, ... },
zh: { common: zhCommon, ... }
},
fallbackLng: 'en',
interpolation: { escapeValue: false }
});

export default i18n;

Step 3: 替换原有文本

这是工作量最大的部分。需要遍历 Langflow 的 React 组件,将硬编码文本替换为 Hook 调用。

修改前:

1
<button>Save Flow</button>

修改后:

1
2
3
4
5
6
import { useTranslation } from 'react-i18next';

export const FlowHeader = () => {
const { t } = useTranslation('flow');
return <button>{t('save_flow')}</button>;
}

3. 接口限流管理设计

3.1 后端逻辑完善

在之前的设计中,限流主要在 API 层读取。现在我们需要增加动态更新限流规则的逻辑。

  • 更新接口: PATCH /api/v1/agent-server/deployments/{id}
  • Redis 同步: 更新数据库后,需要清除或更新 Redis 中旧的限流计数器(可选,或者等待其自动过期)。

3.2 策略调整

使用 fastapi-limiter 时,由于规则是动态字符串(从 DB 读取),我们需要一个自定义的 Callable 作为 limit_func

1
2
3
4
5
6
async def get_dynamic_limit(request: Request, db: Session = Depends(get_session)):
deployment_id = request.path_params.get("id")
# 1. 优先查 Redis 缓存的配置 (减少 DB 压力)
# 2. Redis 没有则查 DB
deployment = ...
return deployment.rate_limit or "60/minute"

4. 测试系统设计与用例

为了保证生产环境的稳定性,必须引入自动化测试。

4.1 测试架构

  • 单元测试 (Unit Test): 测试 AgentService 的图构建、缓存逻辑。
  • 集成测试 (Integration Test): 启动 Docker 环境,测试 API 的完整调用链路。
  • 工具: pytest, pytest-asyncio, httpx (用于异步请求测试).

4.2 目录规划

1
2
3
4
5
tests/agent_server/
├── conftest.py # Fixtures (Mock DB, Mock Redis)
├── test_api.py # API 接口测试
├── test_execution.py # 核心执行引擎测试
└── test_rate_limit.py # 限流专项测试

4.3 核心测试用例开发 (Code Examples)

用例 1: 部署与版本控制 (Test Deployment Versioning)

目标: 验证每次发布,版本号是否自增,且 Snapshot 互不影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# tests/agent_server/test_deployment.py
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_publish_flow_versioning(client: AsyncClient, json_flow_data):
# 1. 第一次发布
resp1 = await client.post("/api/v1/deployments/publish", json={
"flow_id": "flow-123",
"endpoint_slug": "test-agent",
"name": "Test Agent"
})
assert resp1.status_code == 200
assert resp1.json()["version"] == 1

# 2. 第二次发布
resp2 = await client.post("/api/v1/deployments/publish", json={
"flow_id": "flow-123",
"endpoint_slug": "test-agent",
"name": "Test Agent Updated"
})
assert resp2.status_code == 200
assert resp2.json()["version"] == 2
assert resp2.json()["id"] == resp1.json()["id"] # ID 相同,版本不同

用例 2: 限流功能验证 (Test Rate Limiting)

目标: 验证设定 2/second 后,第 3 次请求是否返回 429。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# tests/agent_server/test_rate_limit.py
import pytest
import asyncio
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_rate_limiting_enforcement(client: AsyncClient, active_deployment):
# 假设 active_deployment 限流设置为 "2/second"
slug = active_deployment.endpoint_slug

# 并发发送 3 个请求
tasks = [client.post(f"/api/v1/agent/{slug}", json={"input": "hi"}) for _ in range(3)]
responses = await asyncio.gather(*tasks)

status_codes = [r.status_code for r in responses]
# 应该有两个 200,一个 429
assert 429 in status_codes
assert status_codes.count(200) <= 2

用例 3: 缓存命中验证 (Test Cache Hit)

目标: 验证第二次调用是否跳过了 Graph Build 过程(可以通过 Mock Graph Build 方法来判断调用次数)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# tests/agent_server/test_execution.py
from unittest.mock import patch

@pytest.mark.asyncio
async def test_execution_cache(agent_service, deployment_obj):
# Mock 加载构建过程
with patch.object(agent_service, 'load_and_build_graph', wraps=agent_service.load_and_build_graph) as mock_build:
# 第一次执行
await agent_service.execute_agent(deployment_obj, {"input": "1"})
# 第二次执行
await agent_service.execute_agent(deployment_obj, {"input": "2"})

# 断言构建只发生了一次
assert mock_build.call_count == 1

5. 实施路线图 (Implementation Roadmap)

Phase 1: 国际化基础设施 (1-2 天)

  1. 安装 i18n 依赖。
  2. 创建中英文 JSON 语言包文件。
  3. 开发 LanguageSwitcher 组件并嵌入顶部导航栏。
  4. 先对侧边栏现有 Flow 列表页进行汉化验证。

Phase 2: 后端管理接口完善 (2-3 天)

  1. 修改 Deployment Model,确保 rate_limit 等字段可更新。
  2. 完善 PATCH /deployments/{id} 接口。
  3. 编写 tests/agent_server 下的单元测试,确保后端逻辑无误。

Phase 3: 前端部署中心开发 (3-4 天)

  1. 在 Flow 卡片增加 “Deployments” 入口。
  2. 开发右侧抽屉组件 (DeploymentDrawer)。
  3. 集成 API:
    • 读取历史版本。
    • 更新限流配置。
    • 版本回退操作。
  4. 对新增页面进行全量汉化。

Phase 4: 集成测试与交付 (1-2 天)

  1. 在本地 Docker 环境全链路跑通:配置 -> 发布 -> 汉化界面 -> 外部调用 -> 限流拦截 -> 查看 Langfuse Trace。
  2. 修复 UI 细节问题 (Loading 态, 错误提示)。

是否确认执行此 V2.0 详细设计?
确认后,我将首先为您提供 i18n 的初始化代码前端部署管理组件 的核心代码。

这是一个非常庞大的工程,整合了后端架构、数据库设计、缓存机制、流式协议、前端组件化以及国际化。

为了确保代码的可用性和结构清晰,我将按照模块化的方式提供代码。请严格按照文件路径在您的 Langflow 项目中创建或修改文件。


🛠️ 第一部分:后端开发 (Backend)

目录基础路径:src/backend/base/langflow/

1. 数据库模型定义

文件: services/database/models/agent_server.py (新建)

这里采用了主从表设计,支持版本管理和回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from datetime import datetime
from typing import Optional, Dict, Any, List
from uuid import UUID, uuid4
from sqlmodel import Field, SQLModel, JSON, Column, Relationship

# 部署主表:控制对外接口、限流和当前激活版本
class Deployment(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
flow_id: UUID = Field(index=True)
name: str
endpoint_slug: str = Field(unique=True, index=True)
description: Optional[str] = None
rate_limit: str = Field(default="60/minute")
is_active: bool = Field(default=True)
active_version: int = Field(default=1) # 当前对外服务的版本号

created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)

versions: List["DeploymentVersion"] = Relationship(back_populates="deployment")

# 版本快照表:存储每次发布的静态数据
class DeploymentVersion(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
deployment_id: UUID = Field(foreign_key="deployment.id")
version: int
graph_snapshot: Dict[str, Any] = Field(default={}, sa_column=Column(JSON)) # 冻结的图数据
production_config: Dict[str, Any] = Field(default={}, sa_column=Column(JSON)) # 覆盖配置
created_at: datetime = Field(default_factory=datetime.utcnow)

deployment: Optional[Deployment] = Relationship(back_populates="versions")

# Pydantic Models for API
class DeploymentCreate(SQLModel):
flow_id: UUID
name: str
endpoint_slug: str
production_config: Optional[Dict[str, Any]] = None

class DeploymentRead(SQLModel):
id: UUID
name: str
endpoint_slug: str
active_version: int
rate_limit: str
is_active: bool
updated_at: datetime

2. 核心服务逻辑 (Service Layer)

文件: services/agent_server/service.py (新建)

包含多级缓存、LangGraph 流式适配、Langfuse 注入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import logging
import os
import json
import asyncio
from typing import Any, Dict, Optional, Tuple, AsyncGenerator
from uuid import UUID

from async_lru import alru_cache
from langchain_core.runnables import Runnable, RunnableConfig
from langfuse.callback import CallbackHandler
from sqlmodel import Session, select

from langflow.graph.graph.base import Graph
from langflow.services.database.models.agent_server import Deployment, DeploymentVersion
from langflow.services.get_service import get_session

logger = logging.getLogger("agent_server")

class AgentServerService:
_cache = {} # 内存缓存 (DeploymentID_Version -> Runnable)

async def get_runnable(self, deployment: Deployment, db: Session) -> Runnable:
"""获取可执行对象 (带缓存)"""
# 查找当前激活的版本快照
version_record = db.exec(
select(DeploymentVersion)
.where(DeploymentVersion.deployment_id == deployment.id,
DeploymentVersion.version == deployment.active_version)
).first()

if not version_record:
raise ValueError(f"Version {deployment.active_version} not found for deployment {deployment.name}")

cache_key = f"{deployment.id}_v{version_record.version}"

if cache_key in self._cache:
return self._cache[cache_key]

# Cache Miss: 构建图
logger.info(f"Building graph for {cache_key}")
graph_data = version_record.graph_snapshot
prod_config = version_record.production_config

# 1. 从 Snapshot 恢复 Graph
graph = Graph.from_payload(graph_data)

# 2. 应用生产环境配置 (Tweaks)
if prod_config:
for node in graph.vertices:
if node.id in prod_config:
for key, val in prod_config[node.id].items():
node.params[key] = val

# 3. 编译
# 注意:这里适配 Langflow 的 build 逻辑
built_artifacts = await graph.build() if asyncio.iscoroutinefunction(graph.build) else graph.build()

# 4. 提取 Runnable (兼容 LangChain Chain 和 LangGraph CompiledGraph)
runnable = built_artifacts
if isinstance(built_artifacts, list):
# 尝试寻找主要入口
runnable = built_artifacts[0]

# 写入缓存
self._cache[cache_key] = runnable
return runnable

async def execute_stream(
self,
deployment: Deployment,
inputs: Dict[str, Any],
session_id: Optional[str],
db: Session
) -> AsyncGenerator[str, None]:
"""流式执行 (SSE)"""
try:
runnable = await self.get_runnable(deployment, db)

# 准备 Langfuse
callbacks = []
lf_handler = self._create_langfuse(
trace_name=f"agent-{deployment.name}",
session_id=session_id,
tags=[deployment.endpoint_slug, f"v{deployment.active_version}"]
)
if lf_handler:
callbacks.append(lf_handler)

config = RunnableConfig(
callbacks=callbacks,
configurable={"session_id": session_id}
)

# 判断是否支持 astream_events (LangGraph v0.1+)
if hasattr(runnable, "astream_events"):
async for event in runnable.astream_events(inputs, config=config, version="v2"):
event_type = event["event"]
data = event["data"]

if event_type == "on_chat_model_stream":
chunk = data.get("chunk")
if chunk and hasattr(chunk, "content") and chunk.content:
yield self._sse_pack("token", {"content": chunk.content})

elif event_type == "on_tool_start":
yield self._sse_pack("tool_start", {"tool": event["name"]})

elif event_type == "on_tool_end":
yield self._sse_pack("tool_end", {"output": str(data.get("output"))})

else:
# 普通 Chain 的流式回退
async for chunk in runnable.astream(inputs, config=config):
content = chunk.content if hasattr(chunk, "content") else str(chunk)
yield self._sse_pack("token", {"content": content})

yield self._sse_pack("end", "[DONE]")

except Exception as e:
logger.error(f"Execution error: {e}", exc_info=True)
yield self._sse_pack("error", str(e))

def _sse_pack(self, event: str, data: Any) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"

def _create_langfuse(self, **kwargs):
pk = os.getenv("LANGFUSE_PUBLIC_KEY")
sk = os.getenv("LANGFUSE_SECRET_KEY")
if pk and sk:
return CallbackHandler(public_key=pk, secret_key=sk, host=os.getenv("LANGFUSE_HOST"), **kwargs)
return None

agent_service = AgentServerService()

3. API 路由 (API Routers)

管理接口 (Management)

文件: api/v1/agent_mgmt.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from typing import List
from fastapi import APIRouter, Depends, HTTPException, Body
from sqlmodel import Session, select
from langflow.services.get_service import get_session
from langflow.services.database.models.flow import Flow
from langflow.services.database.models.agent_server import Deployment, DeploymentVersion, DeploymentCreate, DeploymentRead

router = APIRouter(prefix="/deployments", tags=["Agent Management"])

@router.post("/publish", response_model=DeploymentRead)
def publish_flow(data: DeploymentCreate, session: Session = Depends(get_session)):
"""发布新版本"""
# 1. 获取 Flow 数据
flow = session.get(Flow, data.flow_id)
if not flow:
raise HTTPException(404, "Flow not found")

# 2. 检查或创建 Deployment
deployment = session.exec(select(Deployment).where(Deployment.endpoint_slug == data.endpoint_slug)).first()

if not deployment:
deployment = Deployment(
flow_id=data.flow_id,
name=data.name,
endpoint_slug=data.endpoint_slug,
active_version=1
)
session.add(deployment)
session.commit()
session.refresh(deployment)
new_version_num = 1
else:
# 版本自增
last_version = session.exec(
select(DeploymentVersion)
.where(DeploymentVersion.deployment_id == deployment.id)
.order_by(DeploymentVersion.version.desc())
).first()
new_version_num = (last_version.version + 1) if last_version else 1

# 更新 Deployment 信息
deployment.active_version = new_version_num
deployment.name = data.name
session.add(deployment)

# 3. 创建版本快照
version = DeploymentVersion(
deployment_id=deployment.id,
version=new_version_num,
graph_snapshot=flow.data.copy(), # 深度复制
production_config=data.production_config or {}
)
session.add(version)
session.commit()
return deployment

@router.patch("/{id}/config")
def update_config(id: str, rate_limit: str = Body(..., embed=True), is_active: bool = Body(..., embed=True), session: Session = Depends(get_session)):
"""更新限流配置或停用服务"""
dep = session.get(Deployment, id)
if not dep:
raise HTTPException(404)
dep.rate_limit = rate_limit
dep.is_active = is_active
session.add(dep)
session.commit()
return {"status": "updated"}

@router.post("/{id}/rollback")
def rollback_version(id: str, version: int = Body(..., embed=True), session: Session = Depends(get_session)):
"""版本回退"""
dep = session.get(Deployment, id)
ver = session.exec(select(DeploymentVersion).where(DeploymentVersion.deployment_id == id, DeploymentVersion.version == version)).first()
if not dep or not ver:
raise HTTPException(404, "Deployment or Version not found")

dep.active_version = version
session.add(dep)
session.commit()
return {"status": "rolled_back", "current_version": version}

@router.get("/flow/{flow_id}", response_model=List[DeploymentRead])
def get_deployments_by_flow(flow_id: str, session: Session = Depends(get_session)):
return session.exec(select(Deployment).where(Deployment.flow_id == flow_id)).all()

运行时接口 (Runtime)

文件: api/v1/agent_runtime.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from fastapi import APIRouter, Depends, HTTPException, Request, Body
from fastapi.responses import StreamingResponse
from fastapi_limiter.depends import RateLimiter
from sqlmodel import Session, select
from langflow.services.get_service import get_session
from langflow.services.database.models.agent_server import Deployment
from langflow.services.agent_server.service import agent_service

router = APIRouter(prefix="/agent", tags=["Agent Runtime"])

async def get_dynamic_rate_limit(request: Request, session: Session = Depends(get_session)):
"""从数据库动态获取限流规则"""
slug = request.path_params.get("slug")
dep = session.exec(select(Deployment).where(Deployment.endpoint_slug == slug)).first()
return dep.rate_limit if dep else "5/minute" # 默认严格限流

async def rate_limit_key(request: Request):
return f"{request.path_params.get('slug')}:{request.client.host}"

@router.post("/{slug}/stream", dependencies=[Depends(RateLimiter(key_func=rate_limit_key, limit_func=get_dynamic_rate_limit))])
async def stream_agent(
slug: str,
inputs: dict = Body(...),
session_id: str = Body(None),
db: Session = Depends(get_session)
):
"""对外流式接口"""
deployment = db.exec(select(Deployment).where(Deployment.endpoint_slug == slug, Deployment.is_active == True)).first()
if not deployment:
raise HTTPException(404, "Agent not found or inactive")

return StreamingResponse(
agent_service.execute_stream(deployment, inputs, session_id, db),
media_type="text/event-stream"
)

4. 注册 (Main Entry)

修改 src/backend/base/langflow/main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# ... existing imports
import redis.asyncio as redis
from fastapi_limiter import FastAPILimiter
from langflow.api.v1.agent_mgmt import router as mgmt_router
from langflow.api.v1.agent_runtime import router as runtime_router

# ... inside create_app
@app.on_event("startup")
async def startup():
# Redis for Rate Limiting
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379")
r = redis.from_url(redis_url, encoding="utf-8", decode_responses=True)
await FastAPILimiter.init(r)

app.include_router(mgmt_router)
app.include_router(runtime_router)

🎨 第二部分:前端开发 (Frontend)

目录基础路径:src/frontend/src/

注意:前端代码需要编译。你需要将这些文件放入源码并重新运行 npm start

1. 国际化配置 (i18n)

文件: src/i18n.js (新建)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import i18n from 'i18next';
import { initReactI18next } from 'react-i18next';
import LanguageDetector from 'i18next-browser-languagedetector';

const resources = {
en: {
translation: {
"deploy": "Deployments",
"publish": "Publish Version",
"rollback": "Rollback",
"rate_limit": "Rate Limit",
"status": "Status",
"active": "Active",
"inactive": "Inactive",
"endpoint_slug": "URL Slug",
"history": "Version History",
"update_success": "Configuration Updated"
}
},
zh: {
translation: {
"deploy": "部署管理",
"publish": "发布新版本",
"rollback": "版本回退",
"rate_limit": "接口限流规则",
"status": "状态",
"active": "运行中",
"inactive": "已停用",
"endpoint_slug": "访问路径 (Slug)",
"history": "版本历史",
"update_success": "配置已更新"
}
}
};

i18n
.use(LanguageDetector)
.use(initReactI18next)
.init({
resources,
fallbackLng: 'en',
interpolation: { escapeValue: false }
});

export default i18n;

src/index.tsx 中引入:import './i18n';

2. 部署管理抽屉组件 (Deployment Drawer)

这是一个复杂的 React 组件,用于管理部署。

文件: src/components/DeploymentDrawer/index.tsx (新建)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import React, { useState, useEffect } from 'react';
import { useTranslation } from 'react-i18next';
import { Sheet, SheetContent, SheetHeader, SheetTitle, Button, Input, Switch, Table } from "@/components/ui"; // 假设使用了 Shadcn UI
import { toast } from 'use-toast-hook'; // 假设

// API调用封装 (简写)
const api = {
getDeployments: (flowId) => fetch(`/api/v1/deployments/flow/${flowId}`).then(r => r.json()),
publish: (data) => fetch('/api/v1/deployments/publish', { method: 'POST', body: JSON.stringify(data), headers: {'Content-Type': 'application/json'} }),
updateConfig: (id, data) => fetch(`/api/v1/deployments/${id}/config`, { method: 'PATCH', body: JSON.stringify(data), headers: {'Content-Type': 'application/json'} }),
rollback: (id, version) => fetch(`/api/v1/deployments/${id}/rollback`, { method: 'POST', body: JSON.stringify({version}), headers: {'Content-Type': 'application/json'} }),
};

export default function DeploymentDrawer({ flowId, open, onClose }) {
const { t } = useTranslation();
const [deployments, setDeployments] = useState([]);
const [loading, setLoading] = useState(false);

// 表单状态
const [slug, setSlug] = useState("");
const [limit, setLimit] = useState("60/minute");

useEffect(() => {
if (open && flowId) loadData();
}, [open, flowId]);

const loadData = async () => {
setLoading(true);
const data = await api.getDeployments(flowId);
setDeployments(data);
if(data.length > 0) {
setSlug(data[0].endpoint_slug);
setLimit(data[0].rate_limit);
}
setLoading(false);
};

const handlePublish = async () => {
try {
await api.publish({
flow_id: flowId,
name: "Production Agent",
endpoint_slug: slug,
production_config: {}
});
toast({ title: t('publish_success') });
loadData();
} catch (e) {
toast({ title: "Error", variant: "destructive" });
}
};

const handleUpdateConfig = async (dep) => {
await api.updateConfig(dep.id, { rate_limit: limit, is_active: dep.is_active });
toast({ title: t('update_success') });
};

return (
<Sheet open={open} onOpenChange={onClose}>
<SheetContent className="w-[500px]">
<SheetHeader>
<SheetTitle>{t('deploy')}</SheetTitle>
</SheetHeader>

<div className="py-4 space-y-6">
{/* 发布区域 */}
<div className="p-4 border rounded bg-gray-50">
<h3 className="font-bold mb-2">{t('publish')}</h3>
<Input
placeholder={t('endpoint_slug')}
value={slug}
onChange={e => setSlug(e.target.value)}
className="mb-2"
/>
<Button onClick={handlePublish} disabled={loading}>{t('publish')}</Button>
</div>

{/* 列表与配置 */}
{deployments.map(dep => (
<div key={dep.id} className="space-y-4">
<div className="flex items-center justify-between">
<span className="font-mono text-sm bg-blue-100 p-1 rounded">POST /agent/{dep.endpoint_slug}/stream</span>
<Switch
checked={dep.is_active}
onCheckedChange={checked => {
dep.is_active = checked;
handleUpdateConfig(dep);
}}
/>
</div>

<div className="flex gap-2 items-center">
<label className="text-sm">{t('rate_limit')}:</label>
<Input
value={limit}
onChange={e => setLimit(e.target.value)}
className="w-32"
/>
<Button size="sm" variant="outline" onClick={() => handleUpdateConfig(dep)}>Save</Button>
</div>

{/* 版本历史 (简化版) */}
<div className="mt-4">
<h4 className="text-sm font-bold">{t('history')} (Current: v{dep.active_version})</h4>
<div className="max-h-40 overflow-y-auto mt-2">
{/* 实际应从后端获取 versions 列表 */}
<div className="flex justify-between text-sm py-1 border-b">
<span>v{dep.active_version}</span>
<span className="text-green-600">{t('active')}</span>
</div>
{dep.active_version > 1 && (
<div className="flex justify-between text-sm py-1 border-b">
<span>v{dep.active_version - 1}</span>
<Button size="xs" variant="ghost" onClick={() => api.rollback(dep.id, dep.active_version - 1)}>
{t('rollback')}
</Button>
</div>
)}
</div>
</div>
</div>
))}
</div>
</SheetContent>
</Sheet>
);
}

3. 集成到 Flow 列表页

找到 src/pages/FlowPage/components/FlowCard/index.tsx (路径可能随版本变化),添加按钮:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import DeploymentDrawer from '@/components/DeploymentDrawer';

// Inside FlowCard component
const [showDeploy, setShowDeploy] = useState(false);

return (
<>
{/* ... existing card content ... */}
<div className="menu-item" onClick={() => setShowDeploy(true)}>
🚀 {t('deploy')}
</div>

<DeploymentDrawer
open={showDeploy}
onClose={() => setShowDeploy(false)}
flowId={flow.id}
/>
</>
)

🧪 第三部分:测试 (Testing)

文件: tests/agent_server/test_integration.py

使用 pytest-asynciohttpx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import pytest
from httpx import AsyncClient

# 假设已经配置好了 conftest.py 提供 client fixture

@pytest.mark.asyncio
async def test_end_to_end_flow(client: AsyncClient, active_user):
# 1. 发布流程
# 假设我们有一个存在的 flow_id
flow_id = "existing-flow-uuid"

resp = await client.post("/api/v1/deployments/publish", json={
"flow_id": flow_id,
"name": "Integration Test Agent",
"endpoint_slug": "test-integration"
})
assert resp.status_code == 200
dep_data = resp.json()
assert dep_data["version"] == 1

# 2. 验证限流 (初始)
# 调用 Runtime 接口
slug = "test-integration"
resp = await client.post(f"/api/v1/agent/{slug}/stream", json={"input_value": "hi"})
# 应该是 200 (StreamingResponse start)
assert resp.status_code == 200

# 3. 修改限流配置
dep_id = dep_data["id"]
await client.patch(f"/api/v1/deployments/{dep_id}/config", json={
"rate_limit": "0/minute", # 禁止访问
"is_active": True
})

# 4. 再次调用应该被拦截
resp = await client.post(f"/api/v1/agent/{slug}/stream", json={"input_value": "hi"})
assert resp.status_code == 429 # Too Many Requests

# 5. 版本回退测试
# 再次发布产生 v2
await client.post("/api/v1/deployments/publish", json={
"flow_id": flow_id, "name": "v2", "endpoint_slug": "test-integration"
})

# 执行回退到 v1
rollback_resp = await client.post(f"/api/v1/deployments/{dep_id}/rollback", json={"version": 1})
assert rollback_resp.json()["current_version"] == 1

🐳 第四部分:Docker 部署

文件: docker-compose.yml (更新)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
version: '3'
services:
langflow:
build: .
ports:
- "7860:7860"
environment:
- DATABASE_URL=mysql+pymysql://root:root@mysql-server/langflow
- REDIS_URL=redis://redis-server:6379/0
- LANGFUSE_PUBLIC_KEY=pk-lf-...
- LANGFUSE_SECRET_KEY=sk-lf-...
- LANGFUSE_HOST=https://cloud.langfuse.com
depends_on:
- mysql-server
- redis-server

mysql-server:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: langflow
volumes:
- db_data:/var/lib/mysql

redis-server:
image: redis:alpine

volumes:
db_data:

开发执行步骤提醒

  1. 后端: 先创建 Database Models,然后运行 Alembic 迁移数据库。
  2. 后端: 创建 Service 和 API Router,最后在 main.py 注册。
  3. 前端: 安装 npm install i18next react-i18next i18next-browser-languagedetector
  4. 前端: 创建语言包和 Drawer 组件,修改 FlowCard。
  5. 构建: 运行 npm run build 重新编译前端资源。
  6. 测试: 运行 pytest tests/agent_server/

这一套代码完整覆盖了您的需求,可以直接进入编码阶段。


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !