
ReMe(Remember Me, Refine Me)是一个由阿里巴巴 AgentScope 团队开发的 AI 智能体记忆管理框架,基于 Python 3.10+,采用 Apache 2.0 开源协议。
它解决智能体记忆的两个核心痛点:
项目提供了两套记忆系统:
ReMe/
├── LICENSE # Apache 2.0 开源许可证
├── example.env # 环境变量示例文件
├── pyproject.toml # Python 项目配置(依赖、构建、入口点等)
│
├── reme/ # 📦 核心包 — 记忆系统主体
│ ├── __init__.py # 包初始化,定义版本号
│ ├── reme.py # 🗃️ 基于向量库的 ReMe 核心类(CRUD + 检索 + 总结)
│ ├── reme_light.py # 📁 基于文件的 ReMeLight 核心类(压缩 + 持久化 + 检索)
│ ├── reme_cli.py # 🖥️ 命令行交互工具
│ │
│ ├── config/ # ⚙️ 配置文件
│ │ ├── cli.yaml # CLI 配置
│ │ ├── light.yaml # ReMeLight 配置
│ │ ├── service.yaml # 服务端配置
│ │ ├── vector.yaml # 向量记忆配置
│ │ └── reme_config_parser.py # 配置解析器
│ │
│ ├── core/ # 🔧 核心基础设施
│ │ ├── application.py # 应用入口和生命周期管理
│ │ ├── prompt_handler.py # Prompt 处理器
│ │ ├── runtime_context.py # 运行时上下文
│ │ ├── service_context.py # 服务上下文
│ │ ├── registry_factory.py # 注册工厂模式
│ │ ├── base_dict.py # 基础字典工具
│ │ │
│ │ ├── as_llm/ # AgentScope LLM 适配层
│ │ ├── as_llm_formatter/ # LLM 消息格式化器
│ │ ├── as_token_counter/ # AgentScope Token 计数
│ │ ├── embedding/ # 🔢 Embedding 模块(向量化引擎)
│ │ ├── enumeration/ # 枚举类型定义
│ │ ├── file_store/ # 📂 文件存储(向量/FTS 索引)
│ │ ├── file_watcher/ # 👁️ 文件监控(Markdown 文件变更自动同步索引)
│ │ ├── flow/ # 🔄 流程控制(Op 编排引擎)
│ │ ├── llm/ # 🤖 LLM 抽象层(多后端适配)
│ │ ├── op/ # ⚡ 操作原子(Operation 基类和实现)
│ │ ├── schema/ # 📋 数据模型定义(Message、Memory 等)
│ │ ├── service/ # 🌐 HTTP/MCP 服务端
│ │ ├── token_counter/ # 🔢 Token 计数器(多模型适配)
│ │ ├── tools/ # 🛠️ 工具库(文件系统、Web 搜索、MCP 等)
│ │ ├── utils/ # 🧰 通用工具(日志、文本处理、计时器等)
│ │ └── vector_store/ # 💾 向量数据库抽象层(Local/Chroma/Qdrant/ES)
│ │
│ ├── memory/ # 🧠 记忆系统实现
│ │ ├── file_based/ # 📁 基于文件的记忆
│ │ │ ├── components/ # 核心组件:ContextChecker、Compactor、Summarizer、ToolResultCompactor
│ │ │ ├── tools/ # 工具:FileIO(读写编辑)、MemorySearch(混合检索)
│ │ │ ├── utils/ # 辅助工具
│ │ │ └── reme_in_memory_memory.py # 会话内存(Token 感知)
│ │ │
│ │ ├── vector_based/ # 🗃️ 基于向量的记忆
│ │ │ ├── base_memory_agent.py # 记忆 Agent 基类
│ │ │ ├── reme_retriever.py # 记忆检索器
│ │ │ ├── reme_summarizer.py # 记忆总结器
│ │ │ ├── personal/ # 个人记忆模块
│ │ │ ├── procedural/ # 程序性/任务记忆模块
│ │ │ └── tool_call/ # 工具调用记忆模块
│ │ │
│ │ └── vector_tools/ # 🛠️ 向量记忆工具集
│ │ ├── base_memory_tool.py # 记忆工具基类
│ │ ├── delegate_task.py # 任务委托工具
│ │ ├── history/ # 历史记忆工具
│ │ ├── profiles/ # 用户画像工具
│ │ └── record/ # 记录管理工具
│ │
│ └── extension/ # 🧩 扩展模块
│ ├── procedural_memory/ # 程序性记忆扩展(论文实现)
│ ├── simple_chat.py # 简单聊天扩展
│ ├── stream_chat.py # 流式聊天扩展
│ └── translate_ts.py # 翻译工具扩展
│
├── reme_ai/ # 📦 对外发布包(PyPI: reme-ai)
│ ├── __init__.py # 包初始化
│ ├── main.py # CLI 入口(reme 命令)
│ ├── agent/ # 🤖 ReAct Agent 实现
│ │ ├── react/ # ReAct 推理框架
│ │ └── tools/ # Agent 工具集
│ ├── config/ # 配置
│ ├── constants/ # 常量定义
│ ├── enumeration/ # 枚举类型
│ ├── schema/ # 数据模型
│ ├── service/ # 服务接口
│ ├── utils/ # 工具函数
│ ├── summary/ # 📝 记忆总结模块
│ │ ├── personal/ # 个人记忆总结器
│ │ ├── task/ # 任务记忆总结器
│ │ ├── tool/ # 工具记忆总结器
│ │ └── working/ # 工作记忆总结器
│ ├── retrieve/ # 🔍 记忆检索模块
│ │ ├── personal/ # 个人记忆检索
│ │ ├── task/ # 任务记忆检索
│ │ ├── tool/ # 工具记忆检索
│ │ └── working/ # 工作记忆检索
│ └── vector_store/ # 💾 向量存储操作
│ ├── recall_vector_store_op.py # 向量召回
│ ├── update_vector_store_op.py # 向量更新
│ └── vector_store_action_op.py # 向量操作封装
│
├── benchmark/ # 📊 基准测试与实验
│ ├── appworld/ # AppWorld 环境实验
│ ├── bfcl/ # BFCL-V3 工具调用实验
│ ├── halumem/ # HaluMem 幻觉记忆测试
│ ├── locomo/ # LoCoMo 长上下文记忆测试
│ └── longmemeval/ # LongMemEval 长期记忆评估
│
├── docs/ # 📚 文档
│ ├── index.md # 文档首页
│ ├── quick_start.md # 快速开始指南
│ ├── mcp_quick_start.md # MCP 服务快速开始
│ ├── installation.md # 安装指南
│ ├── contribution.md # 贡献指南
│ ├── vector_store_api_guide.md # 向量存储 API 指南
│ ├── cookbook/ # 使用示例
│ ├── library/ # 库参考文档
│ ├── cli/ # CLI 使用文档
│ ├── personal_memory/ # 个人记忆文档
│ ├── task_memory/ # 任务记忆文档
│ ├── tool_memory/ # 工具记忆文档
│ ├── work_memory/ # 工作记忆文档
│ ├── sop_memory/ # SOP 记忆文档
│ └── _static/ # 静态资源(图片、Logo 等)
│
├── test/ # 🧪 老版本测试(cookbook + test)
│ ├── cookbook/ # 用法示例
│ └── test/ # 测试脚本
│
└── tests/ # 🧪 单元测试
├── light/ # ReMeLight 测试
├── vector/ # 向量存储测试
├── test_embedding*.py # Embedding 测试
├── test_file_store.py # 文件存储测试
├── test_llm.py # LLM 测试
├── test_token_counter.py # Token 计数测试
├── test_vector_store.py # 向量存储测试
└── ... # 其他测试
ReMeLight 初始化时选择一个 file_store 后端:
│
├── 选项 A:SqliteFileStore → SQLite 单文件存储一切
│ ├── 向量检索:sqlite-vec 扩展(向量存在 SQLite 表中)
│ ├── 关键词检索:FTS5 trigram
│ └── 混合检索:两者加权融合
│
├── 选项 B:ChromaFileStore → Chroma 存储一切
│ ├── 向量检索:Chroma 原生向量搜索
│ ├── 关键词检索:Chroma $contains 过滤 + Python 评分
│ └── 混合检索:两者加权融合
│
└── 选项 C:LocalFileStore → 纯 Python 内存,无外部依赖
├── 向量检索:numpy 余弦相似度(内存计算)
├── 关键词检索:Python 子串匹配(遍历内存)
└── 混合检索:两者加权融合
第 ① 步:调用入口 — MemorySearch.call() memory_search.py
Agent 调用 memory_search 工具
│
└── MemorySearch.call(context, query="Python 版本偏好")
├── 1. before_execute() ← 前置钩子
├── 2. execute() ← 核心逻辑(见下)
└── 3. after_execute(response) ← 后置钩子
第 ② 步:参数提取与校验 — MemorySearch.execute()
query = "Python 版本偏好" # 从 context 提取
min_score = 0.1 # 最低分数门槛(默认 0.1)
max_results = 5 # 最终返回的 Top-K(默认 5)
第 ③ 步:进入混合检索 — file_store.hybrid_search() sqlite_file_store.py
hybrid_search(
query="Python 版本偏好",
limit=5, # Top-K
sources=[MemorySource.MEMORY], # 只搜 MEMORY.md 和 memory/*.md
vector_weight=0.7, # 向量权重
candidate_multiplier=3.0, # 候选池放大倍数
)
# 关键计算
candidates = min(200, max(1, int(5 * 3.0))) # = 15,两路各自召回 15 条
text_weight = 1.0 - 0.7 # = 0.3,关键词权重
这里的 candidate_multiplier 很重要:不是直接取 Top-5,而是先各取 Top-15 候选,融合后再取 Top-5。这是为了保证两路都有足够候选用于交叉融合。
第 ④ 步:关键词检索路径 — keyword_search()
vector_search("Python 版本偏好", limit=15, sources)
│
├── 5a. 获取查询向量
│ └── query_embedding = await embedding_model.get_embedding("Python 版本偏好")
│ └── 返回如 [0.012, -0.034, ...] 维度 1024 的浮点数组
│
├── 5b. 转换为 sqlite-vec 二进制格式
│ └── query_blob = vector_to_blob(query_embedding)
│
├── 5c. 执行 KNN 向量搜索
│ └── SQL:
│ SELECT c.*, v.distance
│ FROM vec_table v
│ JOIN chunks c ON v.id = c.id
│ WHERE v.embedding MATCH ? ← 向量匹配
│ AND k = 15 ← KNN 取最近 15 个
│ AND c.source IN (?) ← source 过滤
│ ORDER BY v.distance
│
├── 5d. L2 距离 → 相似度分数转换
│ └── score = max(0.0, 1.0 - distance / 2.0)
│ (归一化向量的 L2 距离范围 [0, 2] → 分数范围 [1, 0])
│
└── 返回:vector_results(最多 15 条,按 score 降序)
第 ⑥ 步:加权融合 — _merge_hybrid_results()
_merge_hybrid_results(
vector=vector_results, # 15 条向量结果
keyword=keyword_results, # 15 条关键词结果
vector_weight=0.7,
text_weight=0.3,
)
│
├── 6a. 向量结果加权
│ └── for each result: result.score *= 0.7
│ └── 以 merge_key 作为键存入 dict
│ merge_key = "{path}:{start_line}:{end_line}" 例:"MEMORY.md:5:12"
│
├── 6b. 关键词结果加权 + 去重合并
│ └── for each result:
│ if merge_key 已存在(同一 chunk 被两路都命中):
│ merged[key].score += result.score × 0.3 ← 叠加!
│ else:
│ result.score *= 0.3
│ merged[key] = result
│
├── 6c. 按融合分数降序排序
│
└── 返回:merged(去重后的结果列表,按 score 降序)
融合评分示例:
| chunk | 向量原始分 | 关键词原始分 | 向量 ×0.7 | 关键词 ×0.3 | 融合分 |
|---|---|---|---|---|---|
| A(两路都命中) | 0.85 | 0.80 | 0.595 | +0.240 | 0.835 |
| B(仅向量命中) | 0.72 | — | 0.504 | +0 | 0.504 |
| C(仅关键词命中) | — | 0.90 | 0 | 0.270 | 0.270 |
第 ⑦ 步:截断 Top-K —
return merged[:limit] # 取前 5 条(limit=5)
第 ⑧ 步:最低分数过滤 — 回到 MemorySearch.execute()\
results = [r for r in results if r.score >= min_score] # 默认 min_score=0.1
这一步在 Top-K 截断之后,过滤掉分数低于 0.1 的噪音结果。
总结:
Agent 调用 memory_search(query, max_results=5, min_score=0.1)
│
├── ① MemorySearch.call() ← 入口
├── ② MemorySearch.execute() ← 参数提取
├── ③ file_store.hybrid_search(limit=5, candidate_multiplier=3.0)
│ │
│ ├── candidates = 15 ← 候选池放大
│ │
│ ├── ④ keyword_search(limit=15) ← 关键词路径
│ │ ├── sanitize_fts_query() ← 清洗特殊字符
│ │ ├── FTS5 trigram MATCH ← BM25 排名
│ │ │ 或 LIKE 回退 ← Python 评分
│ │ └── 返回 15 条结果
│ │
│ ├── ⑤ vector_search(limit=15) ← 向量路径
│ │ ├── get_embedding() ← 向量化
│ │ ├── sqlite-vec KNN 搜索 ← L2 距离
│ │ ├── score = 1 - dist/2 ← 分数转换
│ │ └── 返回 15 条结果
│ │
│ ├── ⑥ _merge_hybrid_results() ← 加权融合
│ │ ├── 向量分 × 0.7 ← 向量加权
│ │ ├── 关键词分 × 0.3 ← 关键词加权
│ │ ├── 同 key 叠加分数 ← 去重合并
│ │ └── 按融合分降序排列
│ │
│ └── ⑦ merged[:5] ← 截断 Top-K
│
├── ⑧ filter(score >= 0.1) ← 最低分数过滤
│
│
└── ⑨ json.dumps(results) ← 序列化返回 Agent
ReMe.retrieve_memory(query, user_name="alice")
│
① 构建 Agent 体系(编排器 + 子 Agent + 工具)
│
② ReMeRetriever(编排器 Agent)
│ └── LLM 分析 query,决定分发给哪些子 Agent
│ └── 调用 DelegateTask 工具
│
③ DelegateTask(分发工具)
│ └── 并行启动多个子 Agent
│
④ PersonalRetriever / ProceduralRetriever / ToolRetriever(子 Agent)
│ └── ReAct 多轮推理:LLM 自主决定搜索策略
│ └── 阶段 1:多查询语义搜索(3~5 个不同表述)
│ └── 阶段 2(可选):带时间过滤的搜索
│ └── 阶段 3(可选):读取对话历史深挖
│
⑤ RetrieveMemory 工具
│ └── 按 memory_target 分组查询
│ └── MemoryHandler.batch_search()
│
⑥ MemoryHandler.batch_search()
│ ├── 普通模式:多查询去重
│ └── hybrid_threshold 模式:跨查询余弦相似度重打分
│
⑦ MemoryHandler.search()
│ └── 添加过滤条件(memory_type + memory_target + 时间)
│
⑧ VectorStore.search()(如 Qdrant)
│ └── query → Embedding → KNN 向量检索 → 返回 Top-K
│
⑨ 结果回传 → 去重 → 子 Agent 生成答案 → 编排器汇总
第 ① 步:构建 Agent 体系
result = await reme.retrieve_memory(
query="Python 编程偏好",
user_name="alice", # → PersonalRetriever
# task_name="code_writing", → ProceduralRetriever
# tool_name="pip", → ToolRetriever
retrieve_top_k=20, # 每次向量检索取 Top-20
)
根据传入的参数,构建 Agent 层级:
ReMeRetriever(编排器)
└── DelegateTask(分发工具)
├── PersonalRetriever(个人记忆 Agent)
│ ├── ReadAllProfiles ← 读用户画像
│ ├── RetrieveMemory ← 向量检索(top_k=20)
│ └── ReadHistory ← 读对话历史
├── ProceduralRetriever(任务记忆 Agent)
│ ├── RetrieveMemory
│ └── ReadHistory
└── ToolRetriever(工具记忆 Agent)
├── RetrieveMemory
└── ReadHistory
第 ② 步:ReMeRetriever 编排
reme_retriever.py
reme_retriever.yaml
编排器是一个 ReAct Agent(单步推理)
LLM 收到 system prompt 包含所有可用的 memory_target 列表
LLM 决定将查询分发给哪些 memory_target
只调用一次 delegate_task 工具,传入 [{"memory_target": "alice"}, ...]
第 ③ 步:DelegateTask 并行分发 delegate_task.py
# 去重 + 校验 memory_target
# 为每个 memory_target 复制一个子 Agent
# 异步并行提交所有子 Agent 任务
self.submit_async_task(agent.call, ...) # 并行!
await self.join_async_tasks() # 等待全部完成
关键:多个子 Agent 是并行执行的,不是串行。
第 ④ 步:子 Agent ReAct 多轮推理
以 PersonalRetriever 为例 (personal_retriever.yaml),它是一个 多轮 ReAct Agent,LLM 根据 prompt 中定义的策略自主决定搜索行为:
Phase 1 — 语义搜索(必做):
执行 3~5 个不同表述的查询:原始问题、改写变体、实体聚焦、关键词搜索、相关上下文
Phase 2 — 时间过滤搜索(可选):
仅当用户问题涉及时间时触发,格式如
20200101,20200102
Phase 3 — 历史深挖(可选):
使用 read_history 工具读取完整对话历史,最多读 3 条
⚠️ 这里和 ReMeLight 最大的区别:ReMeLight 的检索流程是代码硬编码的一次性调用;而 ReMe 的检索由 LLM 自主驱动多轮,LLM 决定搜几次、用什么查询词、是否需要时间过滤、是否要读历史。
第 ⑤ 步:RetrieveMemory 工具执行
retrieve_memory.py
# LLM 生成的多个查询项,例如:
query_items = [
{"query": "Python 编程偏好"},
{"query": "用户代码风格喜好"},
{"query": "alice 编程习惯"},
]
# 按 memory_target 分组
queries_by_target["alice"] = [
{"query": "Python 编程偏好", "limit": 20, "filters": {}},
{"query": "用户代码风格喜好", "limit": 20, "filters": {}},
{"query": "alice 编程习惯", "limit": 20, "filters": {}},
]
# 批量检索 + 去重
memory_nodes = await handler.batch_search(searches)
memory_nodes = deduplicate_memories(memory_nodes)
第 ⑥ 步:MemoryHandler.batch_search() memory_handler.py
两种模式:模式 A — 普通模式(hybrid_threshold=None,默认):
对每个查询分别调 self.search()
↓
所有结果按 memory_id 去重
↓
返回去重后的列表(无重新排序)
模式 B — hybrid_threshold 模式(传入阈值,如 0.3):
① 所有查询文本 → 批量 Embedding → query_embeddings [N × dim]
② 对每个查询分别调 self.search() → 所有结果去重
③ 提取结果的 embedding → result_embeddings [M × dim]
④ 计算余弦相似度矩阵 [N × M](每个查询 vs 每个结果)
⑤ 对每个结果取所有查询的平均相似度 → avg_scores [M]
⑥ 过滤 avg_score < threshold 的结果
⑦ 按 avg_score 降序排列
这个 hybrid_threshold 模式是一种跨查询重打分机制——一个结果如果和多个查询都相似,分数就越高。
第 ⑦ 步:MemoryHandler.search() memory_handler.py:search
# 自动注入过滤条件
filters["memory_type"] = "personal" # 记忆类型
filters["memory_target"] = "alice" # 记忆目标
# + 可选的时间过滤 filters["time_int"] = [20200101, 20200102]
# 调用向量库
vector_nodes = await self.vector_store.search(
query="Python 编程偏好",
limit=20,
filters=filters,
)
第 ⑧ 步:VectorStore.search()(底层向量检索)
# 1. 查询文本 → Embedding
query_vector = await self.get_embedding("Python 编程偏好")
# 2. 过滤条件转为 Qdrant Filter 对象
query_filter = self._create_filter(filters)
# → FieldCondition("memory_type", match="personal")
# → FieldCondition("memory_target", match="alice")
# 3. Qdrant KNN 向量搜索
results = await self.client.query_points(
collection_name=self.collection_name,
query=query_vector, # 向量
query_filter=query_filter, # 元数据过滤
limit=20, # Top-K
)
# 4. 返回结果(Qdrant 返回的 score 是余弦相似度)
注意:这里只有纯向量 KNN 检索 + 元数据过滤,没有任何关键词/全文检索。
第 ⑨ 步:结果回传与汇总
VectorStore.search() 返回 VectorNode[]
↓ 转换为 MemoryNode[]
MemoryHandler.batch_search() 去重
↓
RetrieveMemory 工具格式化输出给 LLM
↓
PersonalRetriever(子 Agent)LLM 生成回答
↓
DelegateTask 收集所有子 Agent 结果
↓
ReMeRetriever(编排器)汇总所有答案
↓
ReMe.retrieve_memory() 返回最终结果
安装:
| 方式 | 命令 | 说明 |
|---|---|---|
| PyPI 安装 | pip install reme-ai | 从 PyPI 安装完整版 |
| 源码安装(完整版) | pip install -e . | 包含向量库记忆系统 |
| 源码安装(轻量版) | pip install -e ".[light]" | 仅文件记忆系统(ReMeLight),依赖 AgentScope |
ReMe 提供了 HTTP REST API、MCP 协议、CLI 交互 三种部署方式,以及 Python SDK(ReMeLight、ReMe、ReMeApp)直接调用的方式,可以灵活适配不同的集成场景。
通过 reme 命令启动 HTTP REST API 服务,默认端口 8002:
reme \
backend=http \
http.port=8002 \
llm.default.model_name=qwen3.5-plus \
embedding_model.default.model_name=text-embedding-v4 \
vector_store.default.backend=memory
reme backend=mcp mcp.transport=sse mcp.port=8001
通过 remecli 命令启动交互式命令行聊天:
remecli
ReMeLight — 基于文件的记忆系统
from reme.reme_light import ReMeLight
reme = ReMeLight(
default_as_llm_config={"model_name": "qwen3.5-35b-a3b"},
default_file_store_config={"fts_enabled": True, "vector_enabled": False},
)
await reme.start()
# 核心 API:
await reme.compact_tool_result(messages) # 压缩超长工具输出
await reme.compact_memory(messages, ...) # 压缩对话为摘要
await reme.pre_reasoning_hook(messages, ...) # 推理前预处理钩子(一站式)
await reme.memory_search(query="...", max_results=5) # 语义记忆检索
await reme.close()
ReMe — 基于向量库的记忆系统
from reme import ReMe
reme = ReMe(
working_dir=".reme",
default_llm_config={"backend": "openai", "model_name": "qwen3.5-plus"},
default_embedding_model_config={"backend": "openai", "model_name": "text-embedding-v4", "dimensions": 1024},
default_vector_store_config={"backend": "local"}, # 支持 local/chroma/qdrant/elasticsearch
)
await reme.start()
# 核心 API:
await reme.summarize_memory(messages, user_name="alice") # 从对话提取记忆
await reme.retrieve_memory(query="...", user_name="alice") # 检索记忆
await reme.add_memory(memory_content="...", user_name="alice") # 手动添加
await reme.update_memory(memory_id="...", memory_content="...") # 更新
await reme.delete_memory(memory_id="...") # 删除
await reme.list_memory(user_name="alice") # 列出所有
await reme.close()
ReMeApp — Flow 执行模式
核心类 ReMeApp,通过 Flow 编排执行记忆操作:
from reme_ai.main import ReMeApp
app = ReMeApp(
"llm.default.model_name=qwen3.5-plus",
"vector_store.default.backend=memory",
)
# 同步执行
result = app.execute("retrieve_task_memory", query="Python debugging", top_k=5)
# 异步执行
result = await app.async_execute("summary_task_memory", trajectories=[...])
在 service.yaml 中定义了以下可用的 Flow,在 HTTP 模式下对应为 API 端点,在 MCP 模式下对应为 MCP Tool:
| Flow 名称 | 功能 |
|---|---|
retrieve_task_memory | 检索相关任务记忆 |
summary_task_memory | 从对话轨迹总结记忆 |
add_task_memory | 添加任务记忆 |
delete_task_memory | 删除任务记忆 |
record_task_memory | 更新记忆元数据(频次/效用) |
load_memory | 从磁盘加载记忆 |
dump_memory | 导出记忆到磁盘 |
simple_chat | 简单聊天 |
stream_chat | 流式聊天(SSE) |
ReMe 的 Flow 系统是一套 流程编排引擎,用于将多个 Op(操作算子)组合成可执行的工作流,并通过 HTTP/MCP/命令行等方式对外暴露。
是一个 配置驱动的流程编排引擎:通过 YAML 中的表达式字符串(>> 串行、| 并行)组合可复用的 Op 算子,自动暴露为 HTTP/MCP 接口,支持缓存、流式、重试等生产级特性。
graph TD
Config["service.yaml<br>FlowConfig 定义"] --> ExpressionFlow
ExpressionFlow["ExpressionFlow<br>解析表达式字符串"]
CmdFlow["CmdFlow<br>命令行指定表达式"]
ExpressionFlow --> BaseFlow
CmdFlow --> BaseFlow
BaseFlow["BaseFlow<br>抽象基类"] --> |构建| OpTree["Op 操作树"]
OpTree --> SequentialOp["SequentialOp<br>顺序执行 >>"]
OpTree --> ParallelOp["ParallelOp<br>并行执行 |"]
OpTree --> BaseOp["具体 Op 算子"]
BaseFlow --> |注册到| Service["Service 层"]
Service --> HttpService["HttpService<br>FastAPI REST/SSE"]
Service --> MCPService["MCPService<br>MCP Tools"]
Flow 使用 Python 操作符重载 构建 Op 编排:
| 操作符 | 含义 | 对应类 | 示例 |
|---|---|---|---|
>> | 顺序执行 | SequentialOp | A() >> B() >> C() |
| | 并行执行 | ParallelOp | |
| 组合 | 混合编排 | 嵌套 | A() >> (B()| C()) >> D() |
例如
service.yaml 中定义的:
summary_task_memory:
flow_content: TrajectoryPreprocess() >> (SuccessExtraction()|FailureExtraction()|ComparativeExtraction()) >> MemoryValidation() >> MemoryDeduplication()
解析后的执行结构:
sequential
TrajectoryPreprocess # 1. 预处理轨迹
parallel # 2. 并行提取
SuccessExtraction # - 成功经验提取
FailureExtraction # - 失败教训提取
ComparativeExtraction # - 对比分析提取
MemoryValidation # 3. 记忆验证
MemoryDeduplication # 4. 记忆去重
在 service.yaml 中定义了 9 个 Flow:
| Flow 名称 | 表达式 | 功能 | 流式 |
|---|---|---|---|
retrieve_task_memory | BuildQuery() >> MemoryRetrieval() >> RerankMemory() >> RewriteMemory() | 检索相关任务记忆 | ❌ |
summary_task_memory | TrajectoryPreprocess() >> (SuccessExtraction() | FailureExtraction() | ComparativeExtraction()) >> MemoryValidation() >> MemoryDeduplication() | ||
add_task_memory | MemoryAddition() | 添加任务记忆到向量库 | ❌ |
delete_task_memory | MemoryDeletion() | 按条件删除任务记忆 | ❌ |
record_task_memory | UpdateMemoryMetadata() | 更新记忆的频次/效用元数据 | ❌ |
load_memory | LoadMemory() | 从磁盘加载记忆 | ❌ |
dump_memory | DumpMemory() | 导出记忆到磁盘 | ❌ |
simple_chat | SimpleChat() | 简单聊天 | ❌ |
stream_chat | StreamChat() | 流式聊天 | ✅ |
Flow 由 Op 组成,所有 Op 定义在 reme/core/op/目录:
| Op 类型 | 类名 | 功能 |
|---|---|---|
| 基础算子 | BaseOp | 所有 Op 的基类,提供生命周期(before_execute → execute → after_execute)、重试、缓存、资源访问 |
| 顺序执行 | SequentialOp | 串行执行子 Op |
| 并行执行 | ParallelOp | 并发执行子 Op(asyncio.gather) |
| ReAct 模式 | BaseReact / BaseReactStream | 推理-行动循环,支持工具调用 |
| 工具算子 | BaseTool / MCPTool | 可被 ReAct Agent 调用的工具 |
| Ray 算子 | BaseRayOp | 分布式执行支持 |
已注册的业务 Op(通过 R.ops.register):
来自 procedural_memory:
extension:每个 Op 调用时经历标准的三阶段:
graph LR
Call["call(context)"] --> Before["before_execute<br>输入映射 + 配置加载"]
Before --> Execute["execute<br>核心逻辑"]
Execute --> After["after_execute<br>输出映射 + 响应写入"]
Execute -.-> |重试| Before
before_execute:应用 input_mapping 变量映射,从 ServiceConfig.ops 加载 Op 级参数/Prompt 覆盖
execute:子类实现的核心业务逻辑
after_execute:应用 output_mapping,将返回值写入 Response
graph LR
YAML["service.yaml<br>flows 配置"] --> Init["Application.init_flows()"]
Init --> |解析 FlowConfig| EF["ExpressionFlow 实例化"]
EF --> SC["ServiceContext.flows 注册"]
SC --> |"run_service()"| Service["HttpService / MCPService"]
Service --> |"integrate_flow()"| Endpoint["HTTP POST 端点 / MCP Tool"]