Claude-Mem系列(5):异步队列与容错机制

前置阅读Claude-Mem系列(4):搜索架构 - 渐进式披露与Token优化

一、为什么需要异步处理?

1.1 同步处理的问题

1场景:Agent执行文件编辑
2
3同步方式:
4  工具调用 → 等待处理完成 → 返回结果
5
6            可能耗时500ms-2s
7
8            Agent被阻塞 😤
9
10如果每个工具调用都这样:
11  10个工具调用 × 1秒 = 10秒额外等待
12

1.2 异步处理的优势

1异步方式:
2  工具调用 → 入队 → 立即返回
3
4            后台异步处理
5
6            Agent继续工作 😊
7
8总等待时间:接近0
9

1.3 Claude-Mem的设计原则

核心原则:Worker不可用时永远不会阻塞用户的Claude Code会话

这意味着:

  1. Hook执行必须快速返回
  2. 失败不应影响Agent
  3. 数据最终一致性(非实时)

二、PendingMessageStore架构

2.1 数据库表结构

1CREATE TABLE pending_messages (
2  id TEXT PRIMARY KEY,
3  
4  -- 消息类型
5  message_type TEXT NOT NULL 
6    CHECK(message_type IN ('observation', 'summary', 'session_init', 'session_complete')),
7  
8  -- 会话关联
9  memory_session_id TEXT NOT NULL,
10  project_id TEXT NOT NULL,
11  
12  -- 消息内容
13  payload TEXT NOT NULL,  -- JSON格式
14  
15  -- 处理状态
16  status TEXT NOT NULL DEFAULT 'pending'
17    CHECK(status IN ('pending', 'processing', 'completed', 'failed')),
18  
19  -- 重试机制
20  retry_count INTEGER NOT NULL DEFAULT 0,
21  max_retries INTEGER NOT NULL DEFAULT 3,
22  
23  -- 时间戳
24  created_at_epoch INTEGER NOT NULL,
25  updated_at_epoch INTEGER NOT NULL,
26  processed_at_epoch INTEGER,
27  
28  -- 错误信息
29  error_message TEXT,
30  
31  FOREIGN KEY(memory_session_id) REFERENCES server_sessions(memory_session_id)
32);
33
34-- 索引
35CREATE INDEX idx_pending_messages_status ON pending_messages(status);
36CREATE INDEX idx_pending_messages_session ON pending_messages(memory_session_id);
37CREATE INDEX idx_pending_messages_created ON pending_messages(created_at_epoch);
38

2.2 入队操作

1// PostToolUse Hook调用
2async function enqueueObservation(context: HookContext): Promise<void> {
3  const observation = {
4    memorySessionId: context.memorySessionId,
5    projectId: context.projectId,
6    type: context.toolName,
7    title: generateTitle(context),
8    narrative: generateNarrative(context),
9    facts: extractFacts(context),
10    filesModified: context.filesModified
11  };
12  
13  // 插入pending_messages表
14  await db.prepare(`
15    INSERT INTO pending_messages 
16    (id, message_type, memory_session_id, project_id, payload, status, created_at_epoch, updated_at_epoch)
17    VALUES (?, 'observation', ?, ?, ?, 'pending', ?, ?)
18  `).run(
19    generateId(),
20    observation.memorySessionId,
21    observation.projectId,
22    JSON.stringify(observation),
23    Date.now(),
24    Date.now()
25  );
26}
27

关键点

  • 插入即返回,不等待处理
  • 使用事务保证原子性
  • payload为完整的消息内容

2.3 出队操作

1// Worker后台处理
2class MessageProcessor {
3  async processNextBatch(): Promise<void> {
4    // 获取待处理消息
5    const messages = await db.prepare(`
6      SELECT * FROM pending_messages 
7      WHERE status = 'pending' 
8      ORDER BY created_at_epoch ASC 
9      LIMIT 10
10    `).all();
11    
12    if (messages.length === 0) return;
13    
14    // 标记为处理中
15    const ids = messages.map(m => m.id);
16    await db.prepare(`
17      UPDATE pending_messages 
18      SET status = 'processing', updated_at_epoch = ?
19      WHERE id IN (${ids.map(() => '?').join(',')})
20    `).run(Date.now(), ...ids);
21    
22    // 处理每个消息
23    for (const message of messages) {
24      try {
25        await this.processMessage(message);
26        
27        // 标记为完成
28        await db.prepare(`
29          UPDATE pending_messages 
30          SET status = 'completed', processed_at_epoch = ?, updated_at_epoch = ?
31          WHERE id = ?
32        `).run(Date.now(), Date.now(), message.id);
33        
34      } catch (error) {
35        await this.handleFailure(message, error);
36      }
37    }
38  }
39  
40  async processMessage(message: PendingMessage): Promise<void> {
41    const payload = JSON.parse(message.payload);
42    
43    switch (message.message_type) {
44      case 'observation':
45        await this.processObservation(payload);
46        break;
47      case 'summary':
48        await this.processSummary(payload);
49        break;
50      case 'session_init':
51        await this.processSessionInit(payload);
52        break;
53      case 'session_complete':
54        await this.processSessionComplete(payload);
55        break;
56    }
57  }
58}
59

2.4 失败处理

1async handleFailure(message: PendingMessage, error: Error): Promise<void> {
2  const newRetryCount = message.retry_count + 1;
3  
4  if (newRetryCount >= message.max_retries) {
5    // 超过重试次数,标记为失败
6    await db.prepare(`
7      UPDATE pending_messages 
8      SET status = 'failed', 
9          retry_count = ?,
10          error_message = ?,
11          updated_at_epoch = ?
12      WHERE id = ?
13    `).run(newRetryCount, error.message, Date.now(), message.id);
14    
15    // 记录失败事件
16    await logEvent({
17      type: 'message_processing_failed',
18      messageId: message.id,
19      error: error.message
20    });
21    
22  } else {
23    // 重新入队,等待重试
24    await db.prepare(`
25      UPDATE pending_messages 
26      SET status = 'pending',
27          retry_count = ?,
28          error_message = ?,
29          updated_at_epoch = ?
30      WHERE id = ?
31    `).run(newRetryCount, error.message, Date.now(), message.id);
32  }
33}
34

三、Generator重启机制

3.1 什么是Generator?

Claude-Mem使用Generator模式处理流式响应:

1// SDK Agent的Generator
2async function* processSession(memorySessionId: string) {
3  // 持续监听新消息
4  while (true) {
5    const messages = await getPendingMessages(memorySessionId);
6    
7    if (messages.length === 0) {
8      // 没有新消息,等待
9      await sleep(1000);
10      continue;
11    }
12    
13    // 处理消息并yield结果
14    for (const message of messages) {
15      const result = await processMessage(message);
16      yield result;
17    }
18  }
19}
20

3.2 Generator崩溃场景

1场景1:网络超时
2  Generator正在处理消息
3  → 调用OpenAI API超时
4  → Generator抛出异常
5  → 需要重启
6
7场景2:内存不足
8  Generator处理大量消息
9  → 内存占用过高
10  → 进程被OOM Killer终止
11  → 需要重启
12
13场景3:数据库锁定
14  Generator写入SQLite
15  → 数据库被锁定
16  → 抛出SQLITE_BUSY错误
17  → 需要重启
18

3.3 重启策略:指数退避

1class GeneratorManager {
2  private restartCount = 0;
3  private maxConsecutiveRestarts = 3;
4  private generator: AsyncGenerator | null = null;
5  
6  async start(memorySessionId: string): Promise<void> {
7    while (true) {
8      try {
9        this.generator = processSession(memorySessionId);
10        
11        // 消费Generator
12        for await (const result of this.generator) {
13          // 处理结果
14          await this.handleResult(result);
15          
16          // 成功处理,重置计数器
17          this.restartCount = 0;
18        }
19        
20        // Generator正常结束
21        break;
22        
23      } catch (error) {
24        this.restartCount++;
25        
26        if (this.restartCount > this.maxConsecutiveRestarts) {
27          // 连续重启过多,停止
28          console.error('Too many consecutive restarts, stopping');
29          break;
30        }
31        
32        // 指数退避:1s, 2s, 4s, 8s...
33        const delay = Math.pow(2, this.restartCount - 1) * 1000;
34        console.log(`Generator crashed, restarting in ${delay}ms...`);
35        await sleep(delay);
36      }
37    }
38  }
39}
40

关键设计

1成功处理消息 → restartCount = 0
2Generator崩溃 → restartCount++
3               → 等待 2^(n-1) 秒
4               → 重启Generator
5
6连续重启 > 3次 → 停止,让迭代器结束
7

3.4 为什么这样设计?

1场景:临时网络问题
2
3传统方式:
4  Generator崩溃 → 立即重启 → 又崩溃 → 立即重启...
5  → 快速消耗资源,问题未解决
6
7指数退避:
8  Generator崩溃 → 等1秒 → 重启
9  又崩溃 → 等2秒 → 重启
10  又崩溃 → 等4秒 → 重启
11  网络恢复 → 成功 → 重置计数器
12  
13  → 给系统恢复的时间
14  → 避免资源浪费
15

3.5 消息不丢失保证

1Generator崩溃时:
2  1. 正在处理的消息:
3     - 状态是'processing'
4     - 重启后会重新获取(因为超时机制)
5  
6  2. 队列中的消息:
7     - 状态是'pending'
8     - 重启后正常处理
9  
10  3. 已处理的消息:
11     - 状态是'completed'
12     - 不会被重复处理
13

四、优雅降级设计

4.1 错误分类

1// 错误类型定义
2enum ErrorType {
3  TRANSPORT = 'transport',     // 传输错误
4  CLIENT = 'client',           // 客户端错误
5  SERVER = 'server',           // 服务端错误
6  TIMEOUT = 'timeout',         // 超时
7  UNKNOWN = 'unknown'          // 未知错误
8}
9
10// 错误分类逻辑
11function classifyError(error: Error): ErrorType {
12  if (error.code === 'ECONNREFUSED' || error.code === 'ENOTFOUND') {
13    return ErrorType.TRANSPORT;
14  }
15  
16  if (error.statusCode >= 400 && error.statusCode < 500) {
17    return ErrorType.CLIENT;
18  }
19  
20  if (error.statusCode >= 500) {
21    return ErrorType.SERVER;
22  }
23  
24  if (error.name === 'TimeoutError') {
25    return ErrorType.TIMEOUT;
26  }
27  
28  return ErrorType.UNKNOWN;
29}
30

4.2 降级策略

1// Hook中的错误处理
2async function hookWithErrorHandling(
3  hookName: string,
4  handler: () => Promise<HookResponse>
5): Promise<HookResponse> {
6  try {
7    return await handler();
8    
9  } catch (error) {
10    const errorType = classifyError(error);
11    
12    switch (errorType) {
13      case ErrorType.TRANSPORT:
14        // Worker不可用
15        log.warn(`${hookName}: Worker unavailable, continuing without memory`);
16        return { continue: true };  // 不阻塞
17        
18      case ErrorType.TIMEOUT:
19        // 超时
20        log.warn(`${hookName}: Timeout, continuing without memory`);
21        return { continue: true };  // 不阻塞
22        
23      case ErrorType.SERVER:
24        // 服务端错误
25        log.error(`${hookName}: Server error`, error);
26        return { continue: true };  // 不阻塞
27        
28      case ErrorType.CLIENT:
29        // 客户端错误:可能是bug
30        log.error(`${hookName}: Client error`, error);
31        return { 
32          continue: false, 
33          error: `Hook error: ${error.message}` 
34        };  // 阻塞,提示修复
35        
36      default:
37        // 未知错误:保守处理
38        log.error(`${hookName}: Unknown error`, error);
39        return { continue: true };
40    }
41  }
42}
43

4.3 降级矩阵

错误类型是否阻塞Agent原因
传输错误❌ 不阻塞Worker不可用,但Agent仍可工作
超时❌ 不阻塞网络问题,不应影响用户体验
服务端错误❌ 不阻塞服务端问题,用户无法解决
客户端错误✅ 阻塞可能是配置或代码bug,需要修复
未知错误❌ 不阻塞保守处理,避免意外阻塞

4.4 为什么”永不阻塞”?

1场景:Worker服务崩溃
2
3如果阻塞:
4  用户:输入提示
5  Claude Code:等待Hook响应... (卡住)
6  用户:😭 什么都做不了,只能重启Claude Code
7  
8如果优雅降级:
9  用户:输入提示
10  Claude Code:Hook失败,继续执行(无记忆注入)
11  用户:😊 正常工作,只是这次没有历史上下文
12  用户:可以稍后手动重启Worker
13

核心洞察:记忆是增强,不是必需。没有记忆,Agent仍然可以工作。

五、Session生命周期管理

5.1 状态机

1┌─────────────────────────────────────────────────────────────────┐
2│                    Session状态机                                 │
3├─────────────────────────────────────────────────────────────────┤
4│                                                                   │
5│   [created] ──→ [active] ──→ [completed]                        │
6│                  │                                               │
7│                  └──→ [failed]                                   │
8│                                                                   │
9│   状态转换:                                                      │
10│   - created → active: UserPromptSubmit Hook                     │
11│   - active → completed: SessionEnd Hook (成功)                  │
12│   - active → failed: SessionEnd Hook (失败) 或 超时              │
13│                                                                   │
14└─────────────────────────────────────────────────────────────────┘
15

5.2 超时处理

1// 会话超时检测
2class SessionTimeoutChecker {
3  private checkInterval = 60000;  // 1分钟
4  private sessionTimeout = 3600000;  // 1小时
5  
6  async check(): Promise<void> {
7    const cutoffEpoch = Date.now() - this.sessionTimeout;
8    
9    // 查找超时的活跃会话
10    const timeoutSessions = await db.prepare(`
11      SELECT memory_session_id FROM server_sessions
12      WHERE status = 'active'
13      AND updated_at_epoch < ?
14    `).all(cutoffEpoch);
15    
16    // 标记为失败
17    for (const session of timeoutSessions) {
18      await db.prepare(`
19        UPDATE server_sessions
20        SET status = 'failed', completed_at_epoch = ?, updated_at_epoch = ?
21        WHERE memory_session_id = ?
22      `).run(Date.now(), Date.now(), session.memory_session_id);
23      
24      log.warn(`Session ${session.memory_session_id} timed out`);
25    }
26  }
27}
28

5.3 队列排空

1// Session结束时排空队列
2async function drainQueue(memorySessionId: string): Promise<void> {
3  const maxWaitTime = 30000;  // 最多等待30秒
4  const startTime = Date.now();
5  
6  while (true) {
7    // 检查是否还有待处理消息
8    const pendingCount = await db.prepare(`
9      SELECT COUNT(*) as count FROM pending_messages
10      WHERE memory_session_id = ?
11      AND status IN ('pending', 'processing')
12    `).get(memorySessionId);
13    
14    if (pendingCount.count === 0) {
15      // 队列已空
16      break;
17    }
18    
19    if (Date.now() - startTime > maxWaitTime) {
20      // 超时,强制完成
21      log.warn(`Queue drain timeout for session ${memorySessionId}`);
22      break;
23    }
24    
25    // 等待一下再检查
26    await sleep(1000);
27  }
28}
29

六、监控与告警

6.1 队列监控

1// 队列健康检查
2interface QueueHealth {
3  pending: number;
4  processing: number;
5  failed: number;
6  oldestPendingAge: number;  // 最老的pending消息的年龄(秒)
7}
8
9async function getQueueHealth(): Promise<QueueHealth> {
10  const stats = await db.prepare(`
11    SELECT 
12      status,
13      COUNT(*) as count,
14      MIN(created_at_epoch) as oldest
15    FROM pending_messages
16    WHERE status IN ('pending', 'processing', 'failed')
17    GROUP BY status
18  `).all();
19  
20  const result: QueueHealth = {
21    pending: 0,
22    processing: 0,
23    failed: 0,
24    oldestPendingAge: 0
25  };
26  
27  for (const stat of stats) {
28    result[stat.status] = stat.count;
29    if (stat.status === 'pending' && stat.oldest) {
30      result.oldestPendingAge = (Date.now() - stat.oldest) / 1000;
31    }
32  }
33  
34  return result;
35}
36

6.2 告警规则

1// 告警检查
2async function checkAlerts(): Promise<Alert[]> {
3  const alerts: Alert[] = [];
4  const health = await getQueueHealth();
5  
6  // 规则1:pending消息过多
7  if (health.pending > 100) {
8    alerts.push({
9      level: 'warning',
10      message: `Pending queue has ${health.pending} messages`
11    });
12  }
13  
14  // 规则2:failed消息过多
15  if (health.failed > 10) {
16    alerts.push({
17      level: 'error',
18      message: `${health.failed} messages failed to process`
19    });
20  }
21  
22  // 规则3:最老消息过旧
23  if (health.oldestPendingAge > 300) {  // 5分钟
24    alerts.push({
25      level: 'warning',
26      message: `Oldest pending message is ${health.oldestPendingAge}s old`
27    });
28  }
29  
30  return alerts;
31}
32

七、性能优化

7.1 批量处理

1// 批量处理消息
2async function processBatch(messages: PendingMessage[]): Promise<void> {
3  // 按类型分组
4  const grouped = groupBy(messages, 'message_type');
5  
6  // 批量处理每种类型
7  for (const [type, typeMessages] of Object.entries(grouped)) {
8    switch (type) {
9      case 'observation':
10        await processObservationsBatch(typeMessages);
11        break;
12      case 'summary':
13        await processSummariesBatch(typeMessages);
14        break;
15    }
16  }
17}
18
19// 批量插入memory_items
20async function processObservationsBatch(messages: PendingMessage[]): Promise<void> {
21  const observations = messages.map(m => JSON.parse(m.payload));
22  
23  const insertMany = db.transaction(() => {
24    const stmt = db.prepare(`
25      INSERT INTO memory_items 
26      (id, project_id, server_session_id, kind, type, title, narrative, facts, concepts, created_at_epoch)
27      VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
28    `);
29    
30    for (const obs of observations) {
31      stmt.run(
32        generateId(), obs.projectId, obs.sessionId,
33        'observation', obs.type, obs.title, obs.narrative,
34        JSON.stringify(obs.facts), JSON.stringify(obs.concepts),
35        Date.now()
36      );
37    }
38  });
39  
40  insertMany();
41}
42

7.2 连接池

1// SQLite连接池
2class SQLitePool {
3  private pool: Database[] = [];
4  private maxConnections = 5;
5  
6  async acquire(): Promise<Database> {
7    if (this.pool.length > 0) {
8      return this.pool.pop()!;
9    }
10    
11    if (this.pool.length < this.maxConnections) {
12      return createConnection();
13    }
14    
15    // 等待可用连接
16    return new Promise((resolve) => {
17      const check = () => {
18        if (this.pool.length > 0) {
19          resolve(this.pool.pop()!);
20        } else {
21          setTimeout(check, 10);
22        }
23      };
24      check();
25    });
26  }
27  
28  release(conn: Database): void {
29    this.pool.push(conn);
30  }
31}
32

八、总结

Claude-Mem的异步队列与容错机制确保了:

  1. 永不阻塞Agent:Hook快速返回,后台异步处理
  2. 消息不丢失:pending_messages表保证持久化
  3. 优雅降级:错误分类处理,传输错误不阻塞
  4. 自动恢复:Generator重启机制,指数退避
  5. 最终一致性:队列排空保证数据完整

关键设计原则

  • 记忆是增强,不是必需
  • 失败不应传播到用户
  • 最终一致性优于实时一致性

下一篇Claude-Mem系列(6):应用场景与最佳实践