fix: audit fixes for Runtime Internal API
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 46s
All checks were successful
Deploy API Server / build-and-deploy (push) Successful in 46s
1. heartbeat: locked→running transition on first heartbeat 2. submitResult: validate job is locked/running before accepting 3. submitFailure: validate job is locked/running before accepting 4. resolveCredentialForJob: update lastUsedAt on credential 5. pollJobs: filter by capabilities (snapshotVersion + outputSchemaVersion) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
5cbf20046a
commit
cc2ccbad59
@ -12,11 +12,21 @@ export class RuntimeInternalService {
|
|||||||
// ── Poll ──
|
// ── Poll ──
|
||||||
|
|
||||||
async pollJobs(runtimeInstanceId: string, supportedJobTypes: string[], limit: number, capabilities?: Record<string, unknown>) {
|
async pollJobs(runtimeInstanceId: string, supportedJobTypes: string[], limit: number, capabilities?: Record<string, unknown>) {
|
||||||
|
const supSnapshot = (capabilities?.['supportedSnapshotVersions'] as string[]) ?? [];
|
||||||
|
const supOutput = (capabilities?.['supportedOutputSchemaVersions'] as string[]) ?? [];
|
||||||
|
|
||||||
|
const where: any = {
|
||||||
|
status: 'pending',
|
||||||
|
jobType: { in: supportedJobTypes },
|
||||||
|
};
|
||||||
|
|
||||||
|
// Filter by outputSchemaVersion if Runtime declared its capabilities
|
||||||
|
if (supOutput.length > 0) {
|
||||||
|
where.outputSchemaVersion = { in: supOutput };
|
||||||
|
}
|
||||||
|
|
||||||
const jobs = await this.prisma.aiRuntimeJob.findMany({
|
const jobs = await this.prisma.aiRuntimeJob.findMany({
|
||||||
where: {
|
where,
|
||||||
status: 'pending',
|
|
||||||
jobType: { in: supportedJobTypes },
|
|
||||||
},
|
|
||||||
orderBy: [{ priority: 'asc' }, { createdAt: 'asc' }],
|
orderBy: [{ priority: 'asc' }, { createdAt: 'asc' }],
|
||||||
take: Math.min(limit || 5, 50),
|
take: Math.min(limit || 5, 50),
|
||||||
select: {
|
select: {
|
||||||
@ -24,6 +34,22 @@ export class RuntimeInternalService {
|
|||||||
priority: true, snapshotId: true, promptVersion: true, outputSchemaVersion: true,
|
priority: true, snapshotId: true, promptVersion: true, outputSchemaVersion: true,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Post-filter by snapshotVersion if needed (requires snapshot join)
|
||||||
|
if (supSnapshot.length > 0) {
|
||||||
|
const snapshotIds = [...new Set(jobs.map(j => j.snapshotId).filter(Boolean))];
|
||||||
|
if (snapshotIds.length > 0) {
|
||||||
|
const snapshots = await this.prisma.learningAnalysisSnapshot.findMany({
|
||||||
|
where: { id: { in: snapshotIds as string[] } },
|
||||||
|
select: { id: true, snapshotVersion: true },
|
||||||
|
});
|
||||||
|
const compatibleIds = new Set(
|
||||||
|
snapshots.filter(s => supSnapshot.includes(s.snapshotVersion)).map(s => s.id)
|
||||||
|
);
|
||||||
|
return { jobs: jobs.filter(j => !j.snapshotId || compatibleIds.has(j.snapshotId)) };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return { jobs };
|
return { jobs };
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -66,9 +92,14 @@ export class RuntimeInternalService {
|
|||||||
const now = new Date();
|
const now = new Date();
|
||||||
const lockUntil = new Date(now.getTime() + 60_000);
|
const lockUntil = new Date(now.getTime() + 60_000);
|
||||||
|
|
||||||
|
// First heartbeat: locked → running; subsequent heartbeats: extend lockUntil
|
||||||
const result = await this.prisma.aiRuntimeJob.updateMany({
|
const result = await this.prisma.aiRuntimeJob.updateMany({
|
||||||
where: { id: jobId, lockedBy: runtimeInstanceId, status: 'locked' },
|
where: {
|
||||||
data: { lockUntil },
|
id: jobId,
|
||||||
|
lockedBy: runtimeInstanceId,
|
||||||
|
status: { in: ['locked', 'running'] },
|
||||||
|
},
|
||||||
|
data: { lockUntil, status: 'running' },
|
||||||
});
|
});
|
||||||
|
|
||||||
if (result.count === 0) {
|
if (result.count === 0) {
|
||||||
@ -141,6 +172,9 @@ export class RuntimeInternalService {
|
|||||||
}) {
|
}) {
|
||||||
const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } });
|
const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } });
|
||||||
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
|
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
|
||||||
|
if (job.status !== 'locked' && job.status !== 'running') {
|
||||||
|
throw new ConflictException({ errorCode: 'JOB_NOT_ACTIVE', message: `Job is ${job.status}, cannot accept result` });
|
||||||
|
}
|
||||||
|
|
||||||
const resultIdempotencyKey = `${jobId}:${dto.attemptNo}:${dto.outputHash ?? ''}`;
|
const resultIdempotencyKey = `${jobId}:${dto.attemptNo}:${dto.outputHash ?? ''}`;
|
||||||
|
|
||||||
@ -187,6 +221,9 @@ export class RuntimeInternalService {
|
|||||||
}) {
|
}) {
|
||||||
const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } });
|
const job = await this.prisma.aiRuntimeJob.findUnique({ where: { id: jobId } });
|
||||||
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
|
if (!job) throw new NotFoundException({ errorCode: 'JOB_NOT_FOUND', message: 'Job not found' });
|
||||||
|
if (job.status !== 'locked' && job.status !== 'running') {
|
||||||
|
throw new ConflictException({ errorCode: 'JOB_NOT_ACTIVE', message: `Job is ${job.status}, cannot accept failure` });
|
||||||
|
}
|
||||||
|
|
||||||
const newRetryCount = job.retryCount + 1;
|
const newRetryCount = job.retryCount + 1;
|
||||||
const exceeded = newRetryCount > job.maxRetryCount;
|
const exceeded = newRetryCount > job.maxRetryCount;
|
||||||
|
|||||||
@ -166,6 +166,12 @@ export class UserAiService {
|
|||||||
if (!cred) throw new NotFoundException('Credential not found or not active');
|
if (!cred) throw new NotFoundException('Credential not found or not active');
|
||||||
|
|
||||||
const apiKey = this.crypto.decrypt(cred.encryptedApiKey);
|
const apiKey = this.crypto.decrypt(cred.encryptedApiKey);
|
||||||
|
|
||||||
|
await this.prisma.userModelCredential.update({
|
||||||
|
where: { id: credId },
|
||||||
|
data: { lastUsedAt: new Date() },
|
||||||
|
});
|
||||||
|
|
||||||
return { provider: cred.provider, apiKey };
|
return { provider: cred.provider, apiKey };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user