From cc2ccbad5952205b6ad16206aa5a94d3f89f2603 Mon Sep 17 00:00:00 2001 From: wangdl Date: Thu, 11 Jun 2026 21:38:44 +0800 Subject: [PATCH] fix: audit fixes for Runtime Internal API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. heartbeat: locked→running transition on first heartbeat 2. submitResult: validate job is locked/running before accepting 3. submitFailure: validate job is locked/running before accepting 4. resolveCredentialForJob: update lastUsedAt on credential 5. pollJobs: filter by capabilities (snapshotVersion + outputSchemaVersion) Co-Authored-By: Claude Opus 4.7 --- .../internal/runtime-internal.service.ts | 49 ++++++++++++++++--- src/modules/ai-runtime/user-ai.service.ts | 6 +++ 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/src/modules/ai-runtime/internal/runtime-internal.service.ts b/src/modules/ai-runtime/internal/runtime-internal.service.ts index 2dcb9cf..75f83cd 100644 --- a/src/modules/ai-runtime/internal/runtime-internal.service.ts +++ b/src/modules/ai-runtime/internal/runtime-internal.service.ts @@ -12,11 +12,21 @@ export class RuntimeInternalService { // ── Poll ── async pollJobs(runtimeInstanceId: string, supportedJobTypes: string[], limit: number, capabilities?: Record) { + const supSnapshot = (capabilities?.['supportedSnapshotVersions'] as string[]) ?? []; + const supOutput = (capabilities?.['supportedOutputSchemaVersions'] as string[]) ?? []; + + const where: any = { + status: 'pending', + jobType: { in: supportedJobTypes }, + }; + + // Filter by outputSchemaVersion if Runtime declared its capabilities + if (supOutput.length > 0) { + where.outputSchemaVersion = { in: supOutput }; + } + const jobs = await this.prisma.aiRuntimeJob.findMany({ - where: { - status: 'pending', - jobType: { in: supportedJobTypes }, - }, + where, orderBy: [{ priority: 'asc' }, { createdAt: 'asc' }], take: Math.min(limit || 5, 50), select: { @@ -24,6 +34,22 @@ export class RuntimeInternalService { priority: true, snapshotId: true, promptVersion: true, outputSchemaVersion: true, }, }); + + // Post-filter by snapshotVersion if needed (requires snapshot join) + if (supSnapshot.length > 0) { + const snapshotIds = [...new Set(jobs.map(j => j.snapshotId).filter(Boolean))]; + if (snapshotIds.length > 0) { + const snapshots = await this.prisma.learningAnalysisSnapshot.findMany({ + where: { id: { in: snapshotIds as string[] } }, + select: { id: true, snapshotVersion: true }, + }); + const compatibleIds = new Set( + snapshots.filter(s => supSnapshot.includes(s.snapshotVersion)).map(s => s.id) + ); + return { jobs: jobs.filter(j => !j.snapshotId || compatibleIds.has(j.snapshotId)) }; + } + } + return { jobs }; } @@ -66,9 +92,14 @@ export class RuntimeInternalService { const now = new Date(); const lockUntil = new Date(now.getTime() + 60_000); + // First heartbeat: locked → running; subsequent heartbeats: extend lockUntil const result = await this.prisma.aiRuntimeJob.updateMany({ - where: { id: jobId, lockedBy: runtimeInstanceId, status: 'locked' }, - data: { lockUntil }, + where: { + id: jobId, + lockedBy: runtimeInstanceId, + status: { in: ['locked', 'running'] }, + }, + data: { lockUntil, status: 'running' }, }); if (result.count === 0) { @@ -141,6 +172,9 @@ export class RuntimeInternalService { }) { const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } }); if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' }); + if (job.status !== 'locked' && job.status !== 'running') { + throw new ConflictException({ errorCode: 'JOB_NOT_ACTIVE', message: `Job is ${job.status}, cannot accept result` }); + } const resultIdempotencyKey = `${jobId}:${dto.attemptNo}:${dto.outputHash ?? ''}`; @@ -187,6 +221,9 @@ export class RuntimeInternalService { }) { const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } }); if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' }); + if (job.status !== 'locked' && job.status !== 'running') { + throw new ConflictException({ errorCode: 'JOB_NOT_ACTIVE', message: `Job is ${job.status}, cannot accept failure` }); + } const newRetryCount = job.retryCount + 1; const exceeded = newRetryCount > job.maxRetryCount; diff --git a/src/modules/ai-runtime/user-ai.service.ts b/src/modules/ai-runtime/user-ai.service.ts index 92a0388..f10a7d2 100644 --- a/src/modules/ai-runtime/user-ai.service.ts +++ b/src/modules/ai-runtime/user-ai.service.ts @@ -166,6 +166,12 @@ export class UserAiService { if (!cred) throw new NotFoundException('Credential not found or not active'); const apiKey = this.crypto.decrypt(cred.encryptedApiKey); + + await this.prisma.userModelCredential.update({ + where: { id: credId }, + data: { lastUsedAt: new Date() }, + }); + return { provider: cred.provider, apiKey }; }