feat: Runtime Internal API (API-AI-028~034)
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 46s

- RuntimeInternalController: 7 个 /internal/runtime/* 端点
- RuntimeInternalService: Poll/Lock/Heartbeat/Snapshot/Credential/Result/Fail/Log
- 复用 InternalAuthGuard + x-runtime-instance-id 追踪
- Job lock 防并发 (updateMany WHERE status=pending)
- Result 幂等 (resultIdempotencyKey)
- Failure retryable/non-retryable 处理

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
wangdl 2026-06-11 21:23:52 +08:00
parent 87fe180874
commit 43e6e9029c
3 changed files with 350 additions and 3 deletions

View File

@ -4,11 +4,13 @@ import { PrismaModule } from '../../infrastructure/database/prisma.module';
import { UserAiController } from './user-ai.controller'; import { UserAiController } from './user-ai.controller';
import { UserAiService } from './user-ai.service'; import { UserAiService } from './user-ai.service';
import { CredentialEncryptionService } from './credential-encryption.service'; import { CredentialEncryptionService } from './credential-encryption.service';
import { RuntimeInternalController } from './internal/runtime-internal.controller';
import { RuntimeInternalService } from './internal/runtime-internal.service';
@Module({ @Module({
imports: [ConfigModule, PrismaModule], imports: [ConfigModule, PrismaModule],
controllers: [UserAiController], controllers: [UserAiController, RuntimeInternalController],
providers: [UserAiService, CredentialEncryptionService], providers: [UserAiService, CredentialEncryptionService, RuntimeInternalService],
exports: [UserAiService, CredentialEncryptionService], exports: [UserAiService, CredentialEncryptionService, RuntimeInternalService],
}) })
export class AiRuntimeModule {} export class AiRuntimeModule {}

View File

@ -0,0 +1,77 @@
import { Controller, Get, Post, Param, Body, Req, UseGuards, HttpCode, HttpStatus } from '@nestjs/common';
import { InternalAuthGuard } from '../../../common/guards/internal-auth.guard';
import { RuntimeInternalService } from './runtime-internal.service';
import {
RuntimePollJobsRequestDto, RuntimeLockJobRequestDto, RuntimeHeartbeatRequestDto,
RuntimeResolveCredentialRequestDto, RuntimeSubmitResultRequestDto,
RuntimeSubmitFailureRequestDto, RuntimeSubmitInvocationLogsRequestDto,
} from './dto/runtime-internal.dto';
@Controller('internal/runtime')
@UseGuards(InternalAuthGuard)
export class RuntimeInternalController {
constructor(private readonly service: RuntimeInternalService) {}
private instanceId(req: any): string {
return (req.headers['x-runtime-instance-id'] as string) || 'unknown';
}
// ── Poll ──
@Post('jobs/poll')
async pollJobs(@Req() req: any, @Body() dto: RuntimePollJobsRequestDto) {
return this.service.pollJobs(this.instanceId(req), dto.supportedJobTypes, dto.limit ?? 5, dto.capabilities);
}
// ── Lock ──
@Post('jobs/:jobId/lock')
async lockJob(@Req() req: any, @Param('jobId') jobId: string, @Body() dto: RuntimeLockJobRequestDto) {
return this.service.lockJob(jobId, dto.runtimeInstanceId || this.instanceId(req));
}
// ── Heartbeat ──
@Post('jobs/:jobId/heartbeat')
@HttpCode(HttpStatus.NO_CONTENT)
async heartbeatJob(@Req() req: any, @Param('jobId') jobId: string, @Body() dto: RuntimeHeartbeatRequestDto) {
await this.service.heartbeatJob(jobId, dto.runtimeInstanceId || this.instanceId(req));
}
// ── Snapshot ──
@Get('jobs/:jobId/snapshot')
async getSnapshot(@Param('jobId') jobId: string) {
return this.service.getSnapshot(jobId);
}
// ── Credential Resolve ──
@Post('model-credentials/resolve')
async resolveCredential(@Body() dto: RuntimeResolveCredentialRequestDto) {
return this.service.resolveCredential(dto.jobId, dto.apiKeyMode, dto.provider, dto.credentialId);
}
// ── Submit Result ──
@Post('jobs/:jobId/result')
@HttpCode(HttpStatus.CREATED)
async submitResult(@Param('jobId') jobId: string, @Body() dto: RuntimeSubmitResultRequestDto) {
return this.service.submitResult(jobId, dto);
}
// ── Submit Failure ──
@Post('jobs/:jobId/fail')
async submitFailure(@Param('jobId') jobId: string, @Body() dto: RuntimeSubmitFailureRequestDto) {
return this.service.submitFailure(jobId, dto);
}
// ── Submit Invocation Logs ──
@Post('invocation-logs')
@HttpCode(HttpStatus.CREATED)
async submitInvocationLogs(@Body() dto: RuntimeSubmitInvocationLogsRequestDto) {
return this.service.submitInvocationLogs(dto.logs);
}
}

View File

@ -0,0 +1,268 @@
import { Injectable, NotFoundException, ConflictException, BadRequestException } from '@nestjs/common';
import { PrismaService } from '../../../infrastructure/database/prisma.service';
import { UserAiService } from '../user-ai.service';
@Injectable()
export class RuntimeInternalService {
constructor(
private readonly prisma: PrismaService,
private readonly userAi: UserAiService,
) {}
// ── Poll ──
async pollJobs(runtimeInstanceId: string, supportedJobTypes: string[], limit: number, capabilities?: Record<string, unknown>) {
const jobs = await this.prisma.aiRuntimeJob.findMany({
where: {
status: 'pending',
jobType: { in: supportedJobTypes },
},
orderBy: [{ priority: 'asc' }, { createdAt: 'asc' }],
take: Math.min(limit || 5, 50),
select: {
id: true, jobType: true, targetType: true, targetId: true,
priority: true, snapshotId: true, promptVersion: true, outputSchemaVersion: true,
},
});
return { jobs };
}
// ── Lock ──
async lockJob(jobId: string, runtimeInstanceId: string) {
const now = new Date();
const lockUntil = new Date(now.getTime() + 60_000);
const result = await this.prisma.aiRuntimeJob.updateMany({
where: {
id: jobId,
status: 'pending',
OR: [
{ lockUntil: null },
{ lockUntil: { lt: now } },
],
},
data: {
status: 'locked',
lockedBy: runtimeInstanceId,
lockedAt: now,
lockUntil,
},
});
if (result.count === 0) {
throw new ConflictException({
errorCode: 'JOB_ALREADY_LOCKED',
message: 'Job is already locked or not in pending status',
});
}
return { jobId, status: 'locked', lockUntil: lockUntil.getTime() };
}
// ── Heartbeat ──
async heartbeatJob(jobId: string, runtimeInstanceId: string) {
const now = new Date();
const lockUntil = new Date(now.getTime() + 60_000);
const result = await this.prisma.aiRuntimeJob.updateMany({
where: { id: jobId, lockedBy: runtimeInstanceId, status: 'locked' },
data: { lockUntil },
});
if (result.count === 0) {
throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found or not locked by this runtime' });
}
}
// ── Snapshot ──
async getSnapshot(jobId: string) {
const job = await this.prisma.aiRuntimeJob.findUnique({
where: { id: jobId },
select: { id: true, snapshotId: true },
});
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
if (!job.snapshotId) throw new NotFoundException({ errorCode: 'SNAPSHOT_NOT_FOUND', message: 'No snapshot bound to this job' });
const snapshot = await this.prisma.learningAnalysisSnapshot.findUnique({
where: { id: job.snapshotId },
});
if (!snapshot) throw new NotFoundException({ errorCode: 'SNAPSHOT_NOT_FOUND', message: 'Snapshot not found' });
if (snapshot.expiresAt && new Date(snapshot.expiresAt) < new Date()) {
throw new NotFoundException({ errorCode: 'SNAPSHOT_EXPIRED', message: 'Snapshot has expired for this job' });
}
return {
jobId: job.id,
snapshotId: snapshot.id,
snapshotVersion: snapshot.snapshotVersion,
privacyScope: snapshot.privacyScope,
userProfile: snapshot.userProfile,
aiSettings: snapshot.aiSettings,
deviceContext: snapshot.deviceContext,
learningBehaviorSummary: snapshot.learningBehaviorSummary,
materialProgressSummary: snapshot.materialProgressSummary,
contentStructureSummary: snapshot.contentStructureSummary,
behaviorSignals: snapshot.behaviorSignals,
scoreSignals: snapshot.scoreSignals,
constraints: snapshot.constraints,
allowedModelFields: snapshot.allowedModelFields,
};
}
// ── Credential Resolve ──
async resolveCredential(jobId: string, apiKeyMode: string, provider: string, credentialId?: string) {
const job = await this.prisma.aiRuntimeJob.findUnique({
where: { id: jobId },
select: { userId: true },
});
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
if (apiKeyMode === 'user_deepseek_key') {
if (!credentialId) throw new BadRequestException({ errorCode: 'CREDENTIAL_NOT_FOUND', message: 'credentialId required for user_deepseek_key mode' });
const { provider: resolvedProvider, apiKey } = await this.userAi.resolveCredentialForJob(job.userId, credentialId);
return { provider: resolvedProvider, model: 'deepseek-chat', apiKey, apiKeyMode: 'user_deepseek_key' };
}
// platform_key: Runtime should use its own env var as primary; API returns empty key as signal
return { provider, model: 'deepseek-chat', apiKey: '', apiKeyMode: 'platform_key' };
}
// ── Result ──
async submitResult(jobId: string, dto: {
runtimeInstanceId: string; schemaVersion: string; status: string;
rawOutput?: any; validatedOutput?: any; validationErrors?: string[];
usage?: any; attemptNo: number; outputHash?: string;
}) {
const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } });
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
const resultIdempotencyKey = `${jobId}:${dto.attemptNo}:${dto.outputHash ?? ''}`;
// Check duplicate
const existing = await this.prisma.aiRuntimeResult.findFirst({
where: { resultIdempotencyKey },
});
if (existing) return { status: 'ok', duplicate: true };
// Check already succeeded with different hash
const existingResult = await this.prisma.aiRuntimeResult.findUnique({ where: { jobId } });
if (existingResult && existingResult.status === 'succeeded') {
throw new ConflictException({ errorCode: 'RESULT_ALREADY_EXISTS', message: 'Job already has a succeeded result' });
}
await this.prisma.aiRuntimeResult.create({
data: {
jobId, userId: job.userId,
runtimeInstanceId: dto.runtimeInstanceId,
status: dto.status,
attemptNo: dto.attemptNo,
resultIdempotencyKey,
outputHash: dto.outputHash,
rawOutput: dto.rawOutput as any,
validatedOutput: dto.validatedOutput as any,
schemaVersion: dto.schemaVersion,
validationErrors: dto.validationErrors as any,
},
});
await this.prisma.aiRuntimeJob.update({
where: { id: jobId },
data: { status: 'succeeded', finishedAt: new Date() },
});
return { status: 'ok', duplicate: false };
}
// ── Fail ──
async submitFailure(jobId: string, dto: {
runtimeInstanceId: string; errorCode: string; errorMessage: string;
retryable: boolean; rawError?: string;
}) {
const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } });
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
const newRetryCount = job.retryCount + 1;
const exceeded = newRetryCount > job.maxRetryCount;
if (dto.retryable && !exceeded) {
await this.prisma.aiRuntimeJob.update({
where: { id: jobId },
data: {
status: 'pending',
lockedBy: null, lockedAt: null, lockUntil: null,
retryCount: newRetryCount,
errorCode: dto.errorCode,
errorMessage: dto.errorMessage,
},
});
} else {
await this.prisma.aiRuntimeJob.update({
where: { id: jobId },
data: {
status: 'failed',
finishedAt: new Date(),
retryCount: newRetryCount,
errorCode: dto.errorCode,
errorMessage: dto.errorMessage,
},
});
}
return { status: exceeded ? 'failed' : 'pending', retryCount: newRetryCount };
}
// ── Invocation Logs ──
async submitInvocationLogs(logs: Array<{
jobId: string; provider: string; model: string; apiKeyMode: string;
credentialId?: string; promptName: string; promptVersion: string;
outputSchemaVersion: string; inputTokens: number; outputTokens: number;
totalTokens: number; latencyMs: number; costEstimate?: number;
success: boolean; errorCode?: string; errorMessage?: string;
retryCount: number; runtimeInstanceId: string;
traceId?: string; correlationId?: string;
}>) {
const created = await Promise.all(
logs.map(async (log) => {
const job = await this.prisma.aiRuntimeJob.findUnique({
where: { id: log.jobId }, select: { userId: true },
});
if (!job) return null;
return this.prisma.modelInvocationLog.create({
data: {
userId: job.userId,
jobId: log.jobId,
provider: log.provider,
model: log.model,
apiKeyMode: log.apiKeyMode,
credentialId: log.credentialId,
promptName: log.promptName,
promptVersion: log.promptVersion,
outputSchemaVersion: log.outputSchemaVersion,
inputTokens: log.inputTokens,
outputTokens: log.outputTokens,
totalTokens: log.totalTokens,
latencyMs: log.latencyMs,
costEstimate: log.costEstimate,
success: log.success,
errorCode: log.errorCode,
errorMessage: log.errorMessage,
retryCount: log.retryCount,
runtimeInstanceId: log.runtimeInstanceId,
traceId: log.traceId,
correlationId: log.correlationId,
},
});
}),
);
return { accepted: created.filter(Boolean).length };
}
}