From 045e0b2501d0c20387d768c7ecdb91ab95300860 Mon Sep 17 00:00:00 2001 From: wangdl Date: Thu, 11 Jun 2026 20:35:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20AI=20Job=20=E7=8A=B6=E6=80=81=E6=9C=BA?= =?UTF-8?q?=E4=B8=8E=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E8=AE=BE=E8=AE=A1?= =?UTF-8?q?=20(API-AI-002)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 定义 5 种 Job 类型、7 种状态、完整状态流转图、数据库字段、防并发锁定 机制、retryable/non-retryable 分类、超时释放、幂等规则、Poll 调度策略。 Co-Authored-By: Claude Opus 4.7 --- docs/ai-job-state-machine.md | 233 +++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 docs/ai-job-state-machine.md diff --git a/docs/ai-job-state-machine.md b/docs/ai-job-state-machine.md new file mode 100644 index 0000000..cde1d96 --- /dev/null +++ b/docs/ai-job-state-machine.md @@ -0,0 +1,233 @@ +# AI Job 状态机与任务调度设计 + +## 1. Job 类型 + +| jobType | 说明 | 输入 | 输出 | +|---------|------|------|------| +| `learning_state_analysis` | 学习状态分析 | Snapshot | AiLearningAnalysis | +| `weak_point_analysis` | 薄弱点分析 | Snapshot | WeakPointCandidate[] | +| `next_action_planning` | 下一步建议 | Snapshot | NextActionRecommendation[] | +| `quiz_generation` | 题目候选生成 | Snapshot + params | QuizQuestion[] | +| `flashcard_generation` | 卡片候选生成 | Snapshot + params | Flashcard[] | + +## 2. 状态定义 + +| 状态 | 含义 | 进入条件 | 退出条件 | +|------|------|----------|----------| +| `pending` | 等待消费 | API 创建 or retryable fail 回退 | 被 Runtime lock | +| `locked` | 已被 Runtime 获取 | Runtime POST /lock 成功 | lockUntil 超时 → expired / Runtime 开始执行 | +| `running` | 正在执行 | Runtime 开始执行(heartbeat 或隐式) | 执行完成 → succeeded/failed | +| `succeeded` | 执行成功 | API POST /result 处理完毕 | 终态 | +| `failed` | 执行失败 | non-retryable 错误 or 超过 maxRetryCount | 终态(除非 Admin 重跑) | +| `cancelled` | 已取消 | 用户/Admin 取消 pending job | 终态 | +| `expired` | 超时 | lockUntil 超时未 heartbeat or 执行超时 | 可被 Runtime 重新 poll(retryable) | + +## 3. 状态流转 + +``` + ┌──────────┐ + │ pending │ ←──────────────────────┐ + └────┬─────┘ │ + │ │ + POST /lock │ + │ │ + ┌────▼─────┐ │ + ┌───→│ locked │──→ expired ───────────┘ + │ └────┬─────┘ (lockUntil 超时) + │ │ + │ heartbeat + │ │ + │ ┌────▼─────┐ + │ │ running │──→ expired ───────────┘ + │ └────┬─────┘ (timeoutSeconds 超时) + │ │ + ┌───────┼─────────┼──────────┐ + │ │ │ │ + succeeded failed failed cancelled + (result) (non- (retry- (用户/Admin + retry) able 取消pending) + │ + └──→ pending (retryCount++) +``` + +## 4. 数据库字段 + +```prisma +model AiRuntimeJob { + id String @id @default(cuid()) + userId String + jobType String // learning_state_analysis | weak_point_analysis | next_action_planning | quiz_generation | flashcard_generation + targetType String // user | material | knowledge_point + targetId String + snapshotId String? + status String @default("pending") // pending | locked | running | succeeded | failed | cancelled | expired + priority Int @default(0) // 0=最高 + idempotencyKey String? @unique + apiKeyMode String @default("platform_key") // platform_key | user_deepseek_key + credentialId String? + modelProvider String @default("deepseek") + modelName String @default("deepseek-chat") + promptVersion String? + outputSchemaVersion String? + attemptNo Int @default(0) + retriedFromJobId String? + + // 锁定 + lockedBy String? // runtimeInstanceId + lockedAt DateTime? + lockUntil DateTime? + + // 时间 + startedAt DateTime? + finishedAt DateTime? + + // 重试 + retryCount Int @default(0) + maxRetryCount Int @default(3) + timeoutSeconds Int @default(120) + + // 错误 + errorCode String? + errorMessage String? + + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt + + result AiRuntimeResult? + + @@index([status]) + @@index([jobType]) + @@index([userId]) + @@index([targetType, targetId]) + @@index([lockUntil]) +} +``` + +## 5. 锁定机制 + +### 5.1 Lock 流程 + +``` +Runtime POST /internal/runtime/jobs/{jobId}/lock + → API 检查 job.status === pending + → API 检查 job.lockUntil < now (未被其他 Runtime 持有) + → API 设置 lockedBy, lockedAt, lockUntil=now+60s, status=locked + → 返回 lockUntil +``` + +### 5.2 防并发 + +基于数据库行级写操作保证只有一个 Runtime 锁定成功: +- `UPDATE ... WHERE status='pending' AND (lockUntil IS NULL OR lockUntil < NOW())` +- 影响行数 = 0 则锁定失败(JOB_ALREADY_LOCKED) + +### 5.3 Heartbeat + +``` +Runtime POST /internal/runtime/jobs/{jobId}/heartbeat + → API 检查 lockedBy === runtimeInstanceId + → API 延长 lockUntil = now + 60s + → 204 No Content +``` + +### 5.4 超时释放 + +`lockUntil` 超时后: +- 原 Runtime 的 lock 失效 +- job 状态变为 `expired` +- 其他 Runtime poll 时可重新获取(retryable) +- 如 retryCount < maxRetryCount,job 自动回到 `pending` + +## 6. 重试策略 + +### 6.1 重试触发 + +| 场景 | 处理 | +|------|------| +| Runtime 提交 retryable fail | job → pending, retryCount++ | +| Runtime lock 后无 heartbeat 超时 | job → expired → pending, retryCount++ | +| Runtime 执行超时 | job → expired → pending, retryCount++ | + +### 6.2 重试上限 + +- `retryCount >= maxRetryCount`:job → failed(终态) +- `maxRetryCount` 默认 3,可配置 +- Admin 可手动重跑 failed job(创建新 job,记录 retriedFromJobId) + +### 6.3 retryable vs non-retryable + +| 错误类型 | retryable | 示例 | +|---------|-----------|------| +| MODEL_TIMEOUT | true | DeepSeek 超时 | +| MODEL_RATE_LIMIT | true | 限流 | +| NETWORK_ERROR | true | 网络中断 | +| TEMPORARY_PROVIDER_ERROR | true | 5xx | +| INVALID_SNAPSHOT | false | 快照结构错 | +| INVALID_SCHEMA | false | 输出 schema 错 | +| INVALID_CREDENTIAL | false | Key 无效 | +| JOB_TIMEOUT | true | 执行超时 | + +## 7. 超时 + +| 超时类型 | 默认值 | 说明 | +|---------|--------|------| +| lockUntil | 60s | lock 后未 heartbeat 自动释放 | +| timeoutSeconds | 120s | 总执行超时 | +| heartbeat 间隔 | Runtime 自行决定 | 建议 15-30s | + +## 8. 幂等 + +### 8.1 Job 创建幂等 + +`idempotencyKey` 唯一索引:相同 `userId + jobType + targetType + targetId + idempotencyKey` 的 job 不重复创建。如果没有传 idempotencyKey,则允许重复创建。 + +### 8.2 Result 提交幂等 + +``` +resultIdempotencyKey = jobId + ":" + attemptNo + ":" + outputHash +``` + +- 相同 key 重复提交:返回 200(幂等,不重复落库) +- 已有 succeeded result 但 outputHash 不同:返回 409 RESULT_ALREADY_EXISTS + +### 8.3 Admin 重跑 + +Admin 重跑创建新 job,记录 `retriedFromJobId`,不复用旧 job。 + +## 9. Cancelled / Expired + +| 状态 | 能否被 Runtime 消费 | 处理 | +|------|-------------------|------| +| cancelled | 否 | API 直接设置,不进入 poll 结果 | +| expired | 是(如 retryable) | lockUntil 超时后自动变为 expired,retryable 时回到 pending | + +用户关闭 AI 授权时: +- 所有 pending job → cancelled +- 所有 running job → cancelRequested(Runtime 下次 heartbeat 获知) + +## 10. 任务调度 + +### Poll 规则 + +``` +POST /internal/runtime/jobs/poll + → 返回 status=pending 的 job + → 按 priority ASC, createdAt ASC 排序 + → 只返回 Runtime capabilities 支持的 jobType + → limit 最大 50 +``` + +### 无可用 job 时 + +返回空数组。Runtime 按 pollIntervalMs 等待后重试。 + +## 11. 验收清单 + +- [x] 输出 Job 状态机设计文档 +- [x] 明确每个状态的进入条件和退出条件 +- [x] 明确 Runtime 如何锁定任务(DB 行级写 + lockUntil) +- [x] 明确 lockUntil 超时后如何释放 +- [x] 明确 retryCount / maxRetryCount 规则 +- [x] 明确 idempotencyKey 防重复 +- [x] 明确 Admin 可重跑 failed job +- [x] 明确 cancelled / expired 不应被 Runtime 再次消费