Claude-Mem系列(5):异步队列与容错机制
一、为什么需要异步处理?
1.1 同步处理的问题
1场景:Agent执行文件编辑
2
3同步方式:
4 工具调用 → 等待处理完成 → 返回结果
5 ↓
6 可能耗时500ms-2s
7 ↓
8 Agent被阻塞 😤
9
10如果每个工具调用都这样:
11 10个工具调用 × 1秒 = 10秒额外等待
12
1.2 异步处理的优势
1异步方式:
2 工具调用 → 入队 → 立即返回
3 ↓
4 后台异步处理
5 ↓
6 Agent继续工作 😊
7
8总等待时间:接近0
9
1.3 Claude-Mem的设计原则
核心原则:Worker不可用时永远不会阻塞用户的Claude Code会话。
这意味着:
- Hook执行必须快速返回
- 失败不应影响Agent
- 数据最终一致性(非实时)
二、PendingMessageStore架构
2.1 数据库表结构
1CREATE TABLE pending_messages (
2 id TEXT PRIMARY KEY,
3
4 -- 消息类型
5 message_type TEXT NOT NULL
6 CHECK(message_type IN ('observation', 'summary', 'session_init', 'session_complete')),
7
8 -- 会话关联
9 memory_session_id TEXT NOT NULL,
10 project_id TEXT NOT NULL,
11
12 -- 消息内容
13 payload TEXT NOT NULL, -- JSON格式
14
15 -- 处理状态
16 status TEXT NOT NULL DEFAULT 'pending'
17 CHECK(status IN ('pending', 'processing', 'completed', 'failed')),
18
19 -- 重试机制
20 retry_count INTEGER NOT NULL DEFAULT 0,
21 max_retries INTEGER NOT NULL DEFAULT 3,
22
23 -- 时间戳
24 created_at_epoch INTEGER NOT NULL,
25 updated_at_epoch INTEGER NOT NULL,
26 processed_at_epoch INTEGER,
27
28 -- 错误信息
29 error_message TEXT,
30
31 FOREIGN KEY(memory_session_id) REFERENCES server_sessions(memory_session_id)
32);
33
34-- 索引
35CREATE INDEX idx_pending_messages_status ON pending_messages(status);
36CREATE INDEX idx_pending_messages_session ON pending_messages(memory_session_id);
37CREATE INDEX idx_pending_messages_created ON pending_messages(created_at_epoch);
38
2.2 入队操作
1// PostToolUse Hook调用
2async function enqueueObservation(context: HookContext): Promise<void> {
3 const observation = {
4 memorySessionId: context.memorySessionId,
5 projectId: context.projectId,
6 type: context.toolName,
7 title: generateTitle(context),
8 narrative: generateNarrative(context),
9 facts: extractFacts(context),
10 filesModified: context.filesModified
11 };
12
13 // 插入pending_messages表
14 await db.prepare(`
15 INSERT INTO pending_messages
16 (id, message_type, memory_session_id, project_id, payload, status, created_at_epoch, updated_at_epoch)
17 VALUES (?, 'observation', ?, ?, ?, 'pending', ?, ?)
18 `).run(
19 generateId(),
20 observation.memorySessionId,
21 observation.projectId,
22 JSON.stringify(observation),
23 Date.now(),
24 Date.now()
25 );
26}
27
关键点:
- 插入即返回,不等待处理
- 使用事务保证原子性
- payload为完整的消息内容
2.3 出队操作
1// Worker后台处理
2class MessageProcessor {
3 async processNextBatch(): Promise<void> {
4 // 获取待处理消息
5 const messages = await db.prepare(`
6 SELECT * FROM pending_messages
7 WHERE status = 'pending'
8 ORDER BY created_at_epoch ASC
9 LIMIT 10
10 `).all();
11
12 if (messages.length === 0) return;
13
14 // 标记为处理中
15 const ids = messages.map(m => m.id);
16 await db.prepare(`
17 UPDATE pending_messages
18 SET status = 'processing', updated_at_epoch = ?
19 WHERE id IN (${ids.map(() => '?').join(',')})
20 `).run(Date.now(), ...ids);
21
22 // 处理每个消息
23 for (const message of messages) {
24 try {
25 await this.processMessage(message);
26
27 // 标记为完成
28 await db.prepare(`
29 UPDATE pending_messages
30 SET status = 'completed', processed_at_epoch = ?, updated_at_epoch = ?
31 WHERE id = ?
32 `).run(Date.now(), Date.now(), message.id);
33
34 } catch (error) {
35 await this.handleFailure(message, error);
36 }
37 }
38 }
39
40 async processMessage(message: PendingMessage): Promise<void> {
41 const payload = JSON.parse(message.payload);
42
43 switch (message.message_type) {
44 case 'observation':
45 await this.processObservation(payload);
46 break;
47 case 'summary':
48 await this.processSummary(payload);
49 break;
50 case 'session_init':
51 await this.processSessionInit(payload);
52 break;
53 case 'session_complete':
54 await this.processSessionComplete(payload);
55 break;
56 }
57 }
58}
59
2.4 失败处理
1async handleFailure(message: PendingMessage, error: Error): Promise<void> {
2 const newRetryCount = message.retry_count + 1;
3
4 if (newRetryCount >= message.max_retries) {
5 // 超过重试次数,标记为失败
6 await db.prepare(`
7 UPDATE pending_messages
8 SET status = 'failed',
9 retry_count = ?,
10 error_message = ?,
11 updated_at_epoch = ?
12 WHERE id = ?
13 `).run(newRetryCount, error.message, Date.now(), message.id);
14
15 // 记录失败事件
16 await logEvent({
17 type: 'message_processing_failed',
18 messageId: message.id,
19 error: error.message
20 });
21
22 } else {
23 // 重新入队,等待重试
24 await db.prepare(`
25 UPDATE pending_messages
26 SET status = 'pending',
27 retry_count = ?,
28 error_message = ?,
29 updated_at_epoch = ?
30 WHERE id = ?
31 `).run(newRetryCount, error.message, Date.now(), message.id);
32 }
33}
34
三、Generator重启机制
3.1 什么是Generator?
Claude-Mem使用Generator模式处理流式响应:
1// SDK Agent的Generator
2async function* processSession(memorySessionId: string) {
3 // 持续监听新消息
4 while (true) {
5 const messages = await getPendingMessages(memorySessionId);
6
7 if (messages.length === 0) {
8 // 没有新消息,等待
9 await sleep(1000);
10 continue;
11 }
12
13 // 处理消息并yield结果
14 for (const message of messages) {
15 const result = await processMessage(message);
16 yield result;
17 }
18 }
19}
20
3.2 Generator崩溃场景
1场景1:网络超时
2 Generator正在处理消息
3 → 调用OpenAI API超时
4 → Generator抛出异常
5 → 需要重启
6
7场景2:内存不足
8 Generator处理大量消息
9 → 内存占用过高
10 → 进程被OOM Killer终止
11 → 需要重启
12
13场景3:数据库锁定
14 Generator写入SQLite
15 → 数据库被锁定
16 → 抛出SQLITE_BUSY错误
17 → 需要重启
18
3.3 重启策略:指数退避
1class GeneratorManager {
2 private restartCount = 0;
3 private maxConsecutiveRestarts = 3;
4 private generator: AsyncGenerator | null = null;
5
6 async start(memorySessionId: string): Promise<void> {
7 while (true) {
8 try {
9 this.generator = processSession(memorySessionId);
10
11 // 消费Generator
12 for await (const result of this.generator) {
13 // 处理结果
14 await this.handleResult(result);
15
16 // 成功处理,重置计数器
17 this.restartCount = 0;
18 }
19
20 // Generator正常结束
21 break;
22
23 } catch (error) {
24 this.restartCount++;
25
26 if (this.restartCount > this.maxConsecutiveRestarts) {
27 // 连续重启过多,停止
28 console.error('Too many consecutive restarts, stopping');
29 break;
30 }
31
32 // 指数退避:1s, 2s, 4s, 8s...
33 const delay = Math.pow(2, this.restartCount - 1) * 1000;
34 console.log(`Generator crashed, restarting in ${delay}ms...`);
35 await sleep(delay);
36 }
37 }
38 }
39}
40
关键设计:
1成功处理消息 → restartCount = 0
2Generator崩溃 → restartCount++
3 → 等待 2^(n-1) 秒
4 → 重启Generator
5
6连续重启 > 3次 → 停止,让迭代器结束
7
3.4 为什么这样设计?
1场景:临时网络问题
2
3传统方式:
4 Generator崩溃 → 立即重启 → 又崩溃 → 立即重启...
5 → 快速消耗资源,问题未解决
6
7指数退避:
8 Generator崩溃 → 等1秒 → 重启
9 又崩溃 → 等2秒 → 重启
10 又崩溃 → 等4秒 → 重启
11 网络恢复 → 成功 → 重置计数器
12
13 → 给系统恢复的时间
14 → 避免资源浪费
15
3.5 消息不丢失保证
1Generator崩溃时:
2 1. 正在处理的消息:
3 - 状态是'processing'
4 - 重启后会重新获取(因为超时机制)
5
6 2. 队列中的消息:
7 - 状态是'pending'
8 - 重启后正常处理
9
10 3. 已处理的消息:
11 - 状态是'completed'
12 - 不会被重复处理
13
四、优雅降级设计
4.1 错误分类
1// 错误类型定义
2enum ErrorType {
3 TRANSPORT = 'transport', // 传输错误
4 CLIENT = 'client', // 客户端错误
5 SERVER = 'server', // 服务端错误
6 TIMEOUT = 'timeout', // 超时
7 UNKNOWN = 'unknown' // 未知错误
8}
9
10// 错误分类逻辑
11function classifyError(error: Error): ErrorType {
12 if (error.code === 'ECONNREFUSED' || error.code === 'ENOTFOUND') {
13 return ErrorType.TRANSPORT;
14 }
15
16 if (error.statusCode >= 400 && error.statusCode < 500) {
17 return ErrorType.CLIENT;
18 }
19
20 if (error.statusCode >= 500) {
21 return ErrorType.SERVER;
22 }
23
24 if (error.name === 'TimeoutError') {
25 return ErrorType.TIMEOUT;
26 }
27
28 return ErrorType.UNKNOWN;
29}
30
4.2 降级策略
1// Hook中的错误处理
2async function hookWithErrorHandling(
3 hookName: string,
4 handler: () => Promise<HookResponse>
5): Promise<HookResponse> {
6 try {
7 return await handler();
8
9 } catch (error) {
10 const errorType = classifyError(error);
11
12 switch (errorType) {
13 case ErrorType.TRANSPORT:
14 // Worker不可用
15 log.warn(`${hookName}: Worker unavailable, continuing without memory`);
16 return { continue: true }; // 不阻塞
17
18 case ErrorType.TIMEOUT:
19 // 超时
20 log.warn(`${hookName}: Timeout, continuing without memory`);
21 return { continue: true }; // 不阻塞
22
23 case ErrorType.SERVER:
24 // 服务端错误
25 log.error(`${hookName}: Server error`, error);
26 return { continue: true }; // 不阻塞
27
28 case ErrorType.CLIENT:
29 // 客户端错误:可能是bug
30 log.error(`${hookName}: Client error`, error);
31 return {
32 continue: false,
33 error: `Hook error: ${error.message}`
34 }; // 阻塞,提示修复
35
36 default:
37 // 未知错误:保守处理
38 log.error(`${hookName}: Unknown error`, error);
39 return { continue: true };
40 }
41 }
42}
43
4.3 降级矩阵
| 错误类型 | 是否阻塞Agent | 原因 |
|---|---|---|
| 传输错误 | ❌ 不阻塞 | Worker不可用,但Agent仍可工作 |
| 超时 | ❌ 不阻塞 | 网络问题,不应影响用户体验 |
| 服务端错误 | ❌ 不阻塞 | 服务端问题,用户无法解决 |
| 客户端错误 | ✅ 阻塞 | 可能是配置或代码bug,需要修复 |
| 未知错误 | ❌ 不阻塞 | 保守处理,避免意外阻塞 |
4.4 为什么”永不阻塞”?
1场景:Worker服务崩溃
2
3如果阻塞:
4 用户:输入提示
5 Claude Code:等待Hook响应... (卡住)
6 用户:😭 什么都做不了,只能重启Claude Code
7
8如果优雅降级:
9 用户:输入提示
10 Claude Code:Hook失败,继续执行(无记忆注入)
11 用户:😊 正常工作,只是这次没有历史上下文
12 用户:可以稍后手动重启Worker
13
核心洞察:记忆是增强,不是必需。没有记忆,Agent仍然可以工作。
五、Session生命周期管理
5.1 状态机
1┌─────────────────────────────────────────────────────────────────┐
2│ Session状态机 │
3├─────────────────────────────────────────────────────────────────┤
4│ │
5│ [created] ──→ [active] ──→ [completed] │
6│ │ │
7│ └──→ [failed] │
8│ │
9│ 状态转换: │
10│ - created → active: UserPromptSubmit Hook │
11│ - active → completed: SessionEnd Hook (成功) │
12│ - active → failed: SessionEnd Hook (失败) 或 超时 │
13│ │
14└─────────────────────────────────────────────────────────────────┘
15
5.2 超时处理
1// 会话超时检测
2class SessionTimeoutChecker {
3 private checkInterval = 60000; // 1分钟
4 private sessionTimeout = 3600000; // 1小时
5
6 async check(): Promise<void> {
7 const cutoffEpoch = Date.now() - this.sessionTimeout;
8
9 // 查找超时的活跃会话
10 const timeoutSessions = await db.prepare(`
11 SELECT memory_session_id FROM server_sessions
12 WHERE status = 'active'
13 AND updated_at_epoch < ?
14 `).all(cutoffEpoch);
15
16 // 标记为失败
17 for (const session of timeoutSessions) {
18 await db.prepare(`
19 UPDATE server_sessions
20 SET status = 'failed', completed_at_epoch = ?, updated_at_epoch = ?
21 WHERE memory_session_id = ?
22 `).run(Date.now(), Date.now(), session.memory_session_id);
23
24 log.warn(`Session ${session.memory_session_id} timed out`);
25 }
26 }
27}
28
5.3 队列排空
1// Session结束时排空队列
2async function drainQueue(memorySessionId: string): Promise<void> {
3 const maxWaitTime = 30000; // 最多等待30秒
4 const startTime = Date.now();
5
6 while (true) {
7 // 检查是否还有待处理消息
8 const pendingCount = await db.prepare(`
9 SELECT COUNT(*) as count FROM pending_messages
10 WHERE memory_session_id = ?
11 AND status IN ('pending', 'processing')
12 `).get(memorySessionId);
13
14 if (pendingCount.count === 0) {
15 // 队列已空
16 break;
17 }
18
19 if (Date.now() - startTime > maxWaitTime) {
20 // 超时,强制完成
21 log.warn(`Queue drain timeout for session ${memorySessionId}`);
22 break;
23 }
24
25 // 等待一下再检查
26 await sleep(1000);
27 }
28}
29
六、监控与告警
6.1 队列监控
1// 队列健康检查
2interface QueueHealth {
3 pending: number;
4 processing: number;
5 failed: number;
6 oldestPendingAge: number; // 最老的pending消息的年龄(秒)
7}
8
9async function getQueueHealth(): Promise<QueueHealth> {
10 const stats = await db.prepare(`
11 SELECT
12 status,
13 COUNT(*) as count,
14 MIN(created_at_epoch) as oldest
15 FROM pending_messages
16 WHERE status IN ('pending', 'processing', 'failed')
17 GROUP BY status
18 `).all();
19
20 const result: QueueHealth = {
21 pending: 0,
22 processing: 0,
23 failed: 0,
24 oldestPendingAge: 0
25 };
26
27 for (const stat of stats) {
28 result[stat.status] = stat.count;
29 if (stat.status === 'pending' && stat.oldest) {
30 result.oldestPendingAge = (Date.now() - stat.oldest) / 1000;
31 }
32 }
33
34 return result;
35}
36
6.2 告警规则
1// 告警检查
2async function checkAlerts(): Promise<Alert[]> {
3 const alerts: Alert[] = [];
4 const health = await getQueueHealth();
5
6 // 规则1:pending消息过多
7 if (health.pending > 100) {
8 alerts.push({
9 level: 'warning',
10 message: `Pending queue has ${health.pending} messages`
11 });
12 }
13
14 // 规则2:failed消息过多
15 if (health.failed > 10) {
16 alerts.push({
17 level: 'error',
18 message: `${health.failed} messages failed to process`
19 });
20 }
21
22 // 规则3:最老消息过旧
23 if (health.oldestPendingAge > 300) { // 5分钟
24 alerts.push({
25 level: 'warning',
26 message: `Oldest pending message is ${health.oldestPendingAge}s old`
27 });
28 }
29
30 return alerts;
31}
32
七、性能优化
7.1 批量处理
1// 批量处理消息
2async function processBatch(messages: PendingMessage[]): Promise<void> {
3 // 按类型分组
4 const grouped = groupBy(messages, 'message_type');
5
6 // 批量处理每种类型
7 for (const [type, typeMessages] of Object.entries(grouped)) {
8 switch (type) {
9 case 'observation':
10 await processObservationsBatch(typeMessages);
11 break;
12 case 'summary':
13 await processSummariesBatch(typeMessages);
14 break;
15 }
16 }
17}
18
19// 批量插入memory_items
20async function processObservationsBatch(messages: PendingMessage[]): Promise<void> {
21 const observations = messages.map(m => JSON.parse(m.payload));
22
23 const insertMany = db.transaction(() => {
24 const stmt = db.prepare(`
25 INSERT INTO memory_items
26 (id, project_id, server_session_id, kind, type, title, narrative, facts, concepts, created_at_epoch)
27 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
28 `);
29
30 for (const obs of observations) {
31 stmt.run(
32 generateId(), obs.projectId, obs.sessionId,
33 'observation', obs.type, obs.title, obs.narrative,
34 JSON.stringify(obs.facts), JSON.stringify(obs.concepts),
35 Date.now()
36 );
37 }
38 });
39
40 insertMany();
41}
42
7.2 连接池
1// SQLite连接池
2class SQLitePool {
3 private pool: Database[] = [];
4 private maxConnections = 5;
5
6 async acquire(): Promise<Database> {
7 if (this.pool.length > 0) {
8 return this.pool.pop()!;
9 }
10
11 if (this.pool.length < this.maxConnections) {
12 return createConnection();
13 }
14
15 // 等待可用连接
16 return new Promise((resolve) => {
17 const check = () => {
18 if (this.pool.length > 0) {
19 resolve(this.pool.pop()!);
20 } else {
21 setTimeout(check, 10);
22 }
23 };
24 check();
25 });
26 }
27
28 release(conn: Database): void {
29 this.pool.push(conn);
30 }
31}
32
八、总结
Claude-Mem的异步队列与容错机制确保了:
- 永不阻塞Agent:Hook快速返回,后台异步处理
- 消息不丢失:pending_messages表保证持久化
- 优雅降级:错误分类处理,传输错误不阻塞
- 自动恢复:Generator重启机制,指数退避
- 最终一致性:队列排空保证数据完整
关键设计原则:
- 记忆是增强,不是必需
- 失败不应传播到用户
- 最终一致性优于实时一致性