Skip to content

improve batch completion system for run engine v1 #1656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ const EnvironmentSchema = z.object({
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),
MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500),

REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/presenters/v3/BatchListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class BatchListPresenter extends BasePresenter {
status: BatchTaskRunStatus;
createdAt: Date;
updatedAt: Date;
completedAt: Date | null;
runCount: BigInt;
batchVersion: string;
}[]
Expand All @@ -111,6 +112,7 @@ export class BatchListPresenter extends BasePresenter {
b.status,
b."createdAt",
b."updatedAt",
b."completedAt",
b."runCount",
b."batchVersion"
FROM
Expand Down Expand Up @@ -196,7 +198,11 @@ WHERE
createdAt: batch.createdAt.toISOString(),
updatedAt: batch.updatedAt.toISOString(),
hasFinished,
finishedAt: hasFinished ? batch.updatedAt.toISOString() : undefined,
finishedAt: batch.completedAt
? batch.completedAt.toISOString()
: hasFinished
? batch.updatedAt.toISOString()
: undefined,
status: batch.status,
environment: displayableEnvironment(environment, userId),
runCount: Number(batch.runCount),
Expand Down
9 changes: 5 additions & 4 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const { action, loader } = createActionApiRoute(

const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL);

const run = await service.call(params.taskId, authentication.environment, body, {
const result = await service.call(params.taskId, authentication.environment, body, {
idempotencyKey: idempotencyKey ?? undefined,
idempotencyKeyExpiresAt: idempotencyKeyExpiresAt,
triggerVersion: triggerVersion ?? undefined,
Expand All @@ -83,19 +83,20 @@ const { action, loader } = createActionApiRoute(
oneTimeUseToken,
});

if (!run) {
if (!result) {
return json({ error: "Task not found" }, { status: 404 });
}

const $responseHeaders = await responseHeaders(
run,
result.run,
authentication.environment,
triggerClient
);

return json(
{
id: run.friendlyId,
id: result.run.friendlyId,
isCached: result.isCached,
},
{
headers: $responseHeaders,
Expand Down
33 changes: 22 additions & 11 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
BatchTriggerV3Service,
} from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";

Expand All @@ -40,13 +40,24 @@ const { action, loader } = createActionApiRoute(
}

// Check the there are fewer than MAX_BATCH_V2_TRIGGER_ITEMS items
if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) {
return json(
{
error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`,
},
{ status: 400 }
);
if (body.dependentAttempt) {
if (body.items.length > env.MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS) {
return json(
{
error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_AND_WAIT_V2_TRIGGER_ITEMS} when batchTriggerAndWait.`,
},
{ status: 400 }
);
}
} else {
if (body.items.length > env.MAX_BATCH_V2_TRIGGER_ITEMS) {
return json(
{
error: `Batch size of ${body.items.length} is too large. Maximum allowed batch size is ${env.MAX_BATCH_V2_TRIGGER_ITEMS}.`,
},
{ status: 400 }
);
}
}

const {
Expand Down Expand Up @@ -85,7 +96,7 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);
const service = new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down Expand Up @@ -118,7 +129,7 @@ const { action, loader } = createActionApiRoute(
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json(
{ error: error.message },
{ error: "Something went wrong" },
{ status: 500, headers: { "x-should-retry": "false" } }
);
}
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import {
CancelDevSessionRunsServiceOptions,
} from "~/v3/services/cancelDevSessionRuns.server";
import { logger } from "./logger.server";
import { BatchProcessingOptions, BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
import { BatchProcessingOptions, BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -733,7 +733,7 @@ function getWorkerQueue() {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new BatchTriggerV2Service(payload.strategy);
const service = new BatchTriggerV3Service(payload.strategy);

await service.processBatchTaskRun(payload);
},
Expand Down
10 changes: 5 additions & 5 deletions apps/webapp/app/v3/services/batchTriggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class BatchTriggerTaskService extends BaseService {

for (const item of body.items) {
try {
const run = await triggerTaskService.call(
const result = await triggerTaskService.call(
taskId,
environment,
{
Expand All @@ -123,16 +123,16 @@ export class BatchTriggerTaskService extends BaseService {
}
);

if (run) {
if (result) {
await this._prisma.batchTaskRunItem.create({
data: {
batchTaskRunId: batch.id,
taskRunId: run.id,
status: batchTaskRunItemStatusForRunStatus(run.status),
taskRunId: result.run.id,
status: batchTaskRunItemStatusForRunStatus(result.run.status),
},
});

runs.push(run.friendlyId);
runs.push(result.run.friendlyId);
}

index++;
Expand Down
Loading