diff --git a/.zed/tasks.json b/.zed/tasks.json new file mode 100644 index 0000000000..8612e16bfb --- /dev/null +++ b/.zed/tasks.json @@ -0,0 +1,45 @@ +[ + { + "label": "Build packages", + "command": "pnpm run build --filter \"@trigger.dev/*\" --filter trigger.dev", + //"args": [], + // Env overrides for the command, will be appended to the terminal's environment from the settings. + "env": { "foo": "bar" }, + // Current working directory to spawn the command into, defaults to current project root. + //"cwd": "/path/to/working/directory", + // Whether to use a new terminal tab or reuse the existing one to spawn the process, defaults to `false`. + "use_new_terminal": false, + // Whether to allow multiple instances of the same task to be run, or rather wait for the existing ones to finish, defaults to `false`. + "allow_concurrent_runs": false, + // What to do with the terminal pane and tab, after the command was started: + // * `always` — always show the task's pane, and focus the corresponding tab in it (default) + // * `no_focus` — always show the task's pane, add the task's tab in it, but don't focus it + // * `never` — do not alter focus, but still add/reuse the task's tab in its pane + "reveal": "always", + // What to do with the terminal pane and tab, after the command has finished: + // * `never` — Do nothing when the command finishes (default) + // * `always` — always hide the terminal tab, hide the pane also if it was the last tab in it + // * `on_success` — hide the terminal tab on task success only, otherwise behaves similar to `always` + "hide": "never", + // Which shell to use when running a task inside the terminal. + // May take 3 values: + // 1. (default) Use the system's default terminal configuration in /etc/passwd + // "shell": "system" + // 2. A program: + // "shell": { + // "program": "sh" + // } + // 3. A program with arguments: + // "shell": { + // "with_arguments": { + // "program": "/bin/bash", + // "args": ["--login"] + // } + // } + "shell": "system", + // Whether to show the task line in the output of the spawned task, defaults to `true`. + "show_summary": true, + // Whether to show the command line in the output of the spawned task, defaults to `true`. + "show_output": true + } +] diff --git a/apps/docker-provider/src/index.ts b/apps/docker-provider/src/index.ts index 3ca5184c75..a0b0554fb2 100644 --- a/apps/docker-provider/src/index.ts +++ b/apps/docker-provider/src/index.ts @@ -8,7 +8,7 @@ import { } from "@trigger.dev/core/v3/apps"; import { SimpleLogger } from "@trigger.dev/core/v3/apps"; import { isExecaChildProcess } from "@trigger.dev/core/v3/apps"; -import { testDockerCheckpoint } from "@trigger.dev/core/v3/checkpoints"; +import { testDockerCheckpoint } from "@trigger.dev/core/v3/serverOnly"; import { setTimeout } from "node:timers/promises"; import { PostStartCauses, PreStopCauses } from "@trigger.dev/core/v3"; diff --git a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts index 3c8d95cd5e..c373b11f8a 100644 --- a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts +++ b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts @@ -13,6 +13,7 @@ import { RunFailedWebhook, DeploymentFailedWebhook, DeploymentSuccessWebhook, + isOOMRunError, } from "@trigger.dev/core/v3"; import assertNever from "assert-never"; import { subtle } from "crypto"; @@ -39,7 +40,6 @@ import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; import { ProjectAlertChannelType, ProjectAlertType } from "@trigger.dev/database"; import { alertsRateLimiter } from "~/v3/alertsRateLimiter.server"; import { v3RunPath } from "~/utils/pathBuilder"; -import { isOOMError } from "../completeAttempt.server"; import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server"; type FoundAlert = Prisma.Result< @@ -375,7 +375,7 @@ export class DeliverAlertService extends BaseService { idempotencyKey: alert.taskRun.idempotencyKey ?? undefined, tags: alert.taskRun.runTags, error, - isOutOfMemoryError: isOOMError(error), + isOutOfMemoryError: isOOMRunError(error), machine: alert.taskRun.machinePreset ?? "Unknown", dashboardUrl: `${env.APP_ORIGIN}${v3RunPath( alert.project.organization, diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 324750f1a8..73c8fbc88d 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -11,6 +11,7 @@ import { TaskRunSuccessfulExecutionResult, flattenAttributes, isManualOutOfMemoryError, + isOOMRunError, sanitizeError, shouldRetryError, taskRunErrorEnhancer, @@ -255,7 +256,7 @@ export class CompleteAttemptService extends BaseService { let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); let isOOMRetry = false; - let isOOMAttempt = isOOMError(completion.error); + let isOOMAttempt = isOOMRunError(completion.error); let isOnMaxOOMMachine = false; let oomMachine: MachinePresetName | undefined; @@ -738,45 +739,6 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId: }); } -export function isOOMError(error: TaskRunError) { - if (error.type === "INTERNAL_ERROR") { - if ( - error.code === "TASK_PROCESS_OOM_KILLED" || - error.code === "TASK_PROCESS_MAYBE_OOM_KILLED" - ) { - return true; - } - - // For the purposes of retrying on a larger machine, we're going to treat this is an OOM error. - // This is what they look like if we're executing using k8s. They then get corrected later, but it's too late. - // {"code": "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE", "type": "INTERNAL_ERROR", "message": "Process exited with code -1 after signal SIGKILL."} - if ( - error.code === "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" && - error.message && - error.message.includes("SIGKILL") && - error.message.includes("-1") - ) { - return true; - } - } - - if (error.type === "BUILT_IN_ERROR") { - // ffmpeg also does weird stuff - // { "name": "Error", "type": "BUILT_IN_ERROR", "message": "ffmpeg was killed with signal SIGKILL" } - if (error.message && error.message.includes("ffmpeg was killed with signal SIGKILL")) { - return true; - } - } - - // Special `OutOfMemoryError` for doing a manual OOM kill. - // Useful if a native library does an OOM but doesn't actually crash the run and you want to manually - if (isManualOutOfMemoryError(error)) { - return true; - } - - return false; -} - function exitRun(runId: string) { socketIo.coordinatorNamespace.emit("REQUEST_RUN_CANCELLATION", { version: "v1", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index cddf3994d8..fb14a2f9a7 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -71,6 +71,7 @@ import { isPendingExecuting, } from "./statuses"; import { HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types"; +import { retryOutcomeFromCompletion } from "./retrying"; const workerCatalog = { finishWaitpoint: { @@ -2867,228 +2868,193 @@ export class RunEngine { const failedAt = new Date(); - if ( - completion.error.type === "INTERNAL_ERROR" && - completion.error.code === "TASK_RUN_CANCELLED" - ) { - // We need to cancel the task run instead of fail it - const result = await this.cancelRun({ - runId, - completedAt: failedAt, - reason: completion.error.message, - finalizeRun: true, - tx: prisma, - }); - return { - attemptStatus: - result.snapshot.executionStatus === "PENDING_CANCEL" - ? "RUN_PENDING_CANCEL" - : "RUN_FINISHED", - ...result, - }; - } - - const error = sanitizeError(completion.error); - const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error)); - - const permanentlyFailRun = async (run?: { - status: TaskRunStatus; - spanId: string; - createdAt: Date; - completedAt: Date | null; - taskEventStore: string; - }) => { - // Emit an event so we can complete any spans of stalled executions - if (forceRequeue && run) { - this.eventBus.emit("runAttemptFailed", { - time: failedAt, - run: { - id: runId, - status: run.status, - spanId: run.spanId, - error, - attemptNumber: latestSnapshot.attemptNumber ?? 0, - createdAt: run.createdAt, - completedAt: run.completedAt, - taskEventStore: run.taskEventStore, - }, - }); - } - - return await this.#permanentlyFailRun({ - runId, - snapshotId, - failedAt, - error, - workerId, - runnerId, - }); - }; - - // Error is not retriable, fail the run - if (!retriableError) { - return await permanentlyFailRun(); - } - - // No retry config attached to completion, fail the run - if (completion.retry === undefined) { - return await permanentlyFailRun(); - } - - // Run attempts have reached the global maximum, fail the run - if ( - latestSnapshot.attemptNumber !== null && - latestSnapshot.attemptNumber >= MAX_TASK_RUN_ATTEMPTS - ) { - return await permanentlyFailRun(); - } + const retryResult = await retryOutcomeFromCompletion(prisma, { + runId, + error: completion.error, + retryUsingQueue: forceRequeue ?? false, + retrySettings: completion.retry, + attemptNumber: latestSnapshot.attemptNumber, + }); - const minimalRun = await prisma.taskRun.findFirst({ - where: { - id: runId, - }, - select: { - status: true, - spanId: true, - maxAttempts: true, - runtimeEnvironment: { - select: { - organizationId: true, + // Force requeue means it was crashed so the attempt span needs to be closed + if (forceRequeue) { + const minimalRun = await prisma.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + status: true, + spanId: true, + maxAttempts: true, + runtimeEnvironment: { + select: { + organizationId: true, + }, }, + taskEventStore: true, + createdAt: true, + completedAt: true, }, - taskEventStore: true, - createdAt: true, - completedAt: true, - }, - }); - - if (!minimalRun) { - throw new ServiceValidationError("Run not found", 404); - } - - // Run doesn't have any max attempts set which is required for retrying, fail the run - if (!minimalRun.maxAttempts) { - return await permanentlyFailRun(minimalRun); - } + }); - // Run has reached the maximum configured number of attempts, fail the run - if ( - latestSnapshot.attemptNumber !== null && - latestSnapshot.attemptNumber >= minimalRun.maxAttempts - ) { - return await permanentlyFailRun(minimalRun); - } + if (!minimalRun) { + throw new ServiceValidationError("Run not found", 404); + } - // This error didn't come from user code, so we need to emit an event to complete any spans - if (forceRequeue) { this.eventBus.emit("runAttemptFailed", { time: failedAt, run: { id: runId, status: minimalRun.status, spanId: minimalRun.spanId, - error, + error: completion.error, attemptNumber: latestSnapshot.attemptNumber ?? 0, - taskEventStore: minimalRun.taskEventStore, createdAt: minimalRun.createdAt, completedAt: minimalRun.completedAt, + taskEventStore: minimalRun.taskEventStore, }, }); } - const retryAt = new Date(completion.retry.timestamp); + switch (retryResult.outcome) { + case "cancel_run": { + const result = await this.cancelRun({ + runId, + completedAt: failedAt, + reason: retryResult.reason, + finalizeRun: true, + tx: prisma, + }); + return { + attemptStatus: + result.snapshot.executionStatus === "PENDING_CANCEL" + ? "RUN_PENDING_CANCEL" + : "RUN_FINISHED", + ...result, + }; + } + case "fail_run": { + return await this.#permanentlyFailRun({ + runId, + snapshotId, + failedAt, + error: retryResult.sanitizedError, + workerId, + runnerId, + }); + } + case "retry": { + const retryAt = new Date(retryResult.settings.timestamp); - const run = await prisma.taskRun.update({ - where: { - id: runId, - }, - data: { - status: "RETRYING_AFTER_FAILURE", - }, - include: { - runtimeEnvironment: { + const run = await prisma.taskRun.update({ + where: { + id: runId, + }, + data: { + status: "RETRYING_AFTER_FAILURE", + machinePreset: retryResult.machine, + }, include: { - project: true, - organization: true, - orgMember: true, + runtimeEnvironment: { + include: { + project: true, + organization: true, + orgMember: true, + }, + }, }, - }, - }, - }); + }); - const nextAttemptNumber = - latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1; + const nextAttemptNumber = + latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1; - this.eventBus.emit("runRetryScheduled", { - time: failedAt, - run: { - id: run.id, - friendlyId: run.friendlyId, - attemptNumber: nextAttemptNumber, - queue: run.queue, - taskIdentifier: run.taskIdentifier, - traceContext: run.traceContext as Record, - baseCostInCents: run.baseCostInCents, - spanId: run.spanId, - }, - organization: { - id: run.runtimeEnvironment.organizationId, - }, - environment: run.runtimeEnvironment, - retryAt, - }); + if (retryResult.wasOOMError) { + this.eventBus.emit("runAttemptFailed", { + time: failedAt, + run: { + id: runId, + status: run.status, + spanId: run.spanId, + error: completion.error, + attemptNumber: latestSnapshot.attemptNumber ?? 0, + createdAt: run.createdAt, + completedAt: run.completedAt, + taskEventStore: run.taskEventStore, + }, + }); + } - //todo anything special for DEV? Ideally not. + this.eventBus.emit("runRetryScheduled", { + time: failedAt, + run: { + id: run.id, + friendlyId: run.friendlyId, + attemptNumber: nextAttemptNumber, + queue: run.queue, + taskIdentifier: run.taskIdentifier, + traceContext: run.traceContext as Record, + baseCostInCents: run.baseCostInCents, + spanId: run.spanId, + }, + organization: { + id: run.runtimeEnvironment.organizationId, + }, + environment: run.runtimeEnvironment, + retryAt, + }); - //if it's a long delay and we support checkpointing, put it back in the queue - if ( - forceRequeue || - (this.options.retryWarmStartThresholdMs !== undefined && - completion.retry.delay >= this.options.retryWarmStartThresholdMs) - ) { - //we nack the message, requeuing it for later - const nackResult = await this.#tryNackAndRequeue({ - run, - environment: run.runtimeEnvironment, - orgId: run.runtimeEnvironment.organizationId, - timestamp: retryAt.getTime(), - error: { - type: "INTERNAL_ERROR", - code: "TASK_RUN_DEQUEUED_MAX_RETRIES", - message: `We tried to dequeue the run the maximum number of times but it wouldn't start executing`, - }, - tx: prisma, - }); + //if it's a long delay and we support checkpointing, put it back in the queue + if ( + forceRequeue || + retryResult.method === "queue" || + (this.options.retryWarmStartThresholdMs !== undefined && + retryResult.settings.delay >= this.options.retryWarmStartThresholdMs) + ) { + //we nack the message, requeuing it for later + const nackResult = await this.#tryNackAndRequeue({ + run, + environment: run.runtimeEnvironment, + orgId: run.runtimeEnvironment.organizationId, + timestamp: retryAt.getTime(), + error: { + type: "INTERNAL_ERROR", + code: "TASK_RUN_DEQUEUED_MAX_RETRIES", + message: `We tried to dequeue the run the maximum number of times but it wouldn't start executing`, + }, + tx: prisma, + }); + + if (!nackResult.wasRequeued) { + return { + attemptStatus: "RUN_FINISHED", + ...nackResult, + }; + } else { + return { attemptStatus: "RETRY_QUEUED", ...nackResult }; + } + } + + //it will continue running because the retry delay is short + const newSnapshot = await this.#createExecutionSnapshot(prisma, { + run, + snapshot: { + executionStatus: "PENDING_EXECUTING", + description: "Attempt failed with a short delay, starting a new attempt", + }, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + workerId, + runnerId, + }); + //the worker can fetch the latest snapshot and should create a new attempt + await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot }); - if (!nackResult.wasRequeued) { return { - attemptStatus: "RUN_FINISHED", - ...nackResult, + attemptStatus: "RETRY_IMMEDIATELY", + ...executionResultFromSnapshot(newSnapshot), }; - } else { - return { attemptStatus: "RETRY_QUEUED", ...nackResult }; } } - - //it will continue running because the retry delay is short - const newSnapshot = await this.#createExecutionSnapshot(prisma, { - run, - snapshot: { - executionStatus: "PENDING_EXECUTING", - description: "Attempt failed with a short delay, starting a new attempt", - }, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - workerId, - runnerId, - }); - //the worker can fetch the latest snapshot and should create a new attempt - await this.#sendNotificationToWorker({ runId, snapshot: newSnapshot }); - - return { - attemptStatus: "RETRY_IMMEDIATELY", - ...executionResultFromSnapshot(newSnapshot), - }; }); }); } diff --git a/internal-packages/run-engine/src/engine/retrying.ts b/internal-packages/run-engine/src/engine/retrying.ts new file mode 100644 index 0000000000..974bcd1151 --- /dev/null +++ b/internal-packages/run-engine/src/engine/retrying.ts @@ -0,0 +1,175 @@ +import { + isOOMRunError, + RetryOptions, + shouldRetryError, + TaskRunError, + TaskRunExecutionRetry, + taskRunErrorEnhancer, + sanitizeError, + calculateNextRetryDelay, +} from "@trigger.dev/core/v3"; +import { PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database"; +import { MAX_TASK_RUN_ATTEMPTS } from "./consts"; +import { ServiceValidationError } from "."; + +type Params = { + runId: string; + attemptNumber: number | null; + error: TaskRunError; + retryUsingQueue: boolean; + retrySettings: TaskRunExecutionRetry | undefined; +}; + +export type RetryOutcome = + | { + outcome: "cancel_run"; + reason?: string; + } + | { + outcome: "fail_run"; + sanitizedError: TaskRunError; + wasOOMError?: boolean; + } + | { + outcome: "retry"; + method: "queue" | "immediate"; + settings: TaskRunExecutionRetry; + machine?: string; + wasOOMError?: boolean; + }; + +export async function retryOutcomeFromCompletion( + prisma: PrismaClientOrTransaction, + { runId, attemptNumber, error, retryUsingQueue, retrySettings }: Params +): Promise { + // Canceled + if (error.type === "INTERNAL_ERROR" && error.code === "TASK_RUN_CANCELLED") { + return { outcome: "cancel_run", reason: error.message }; + } + + const sanitizedError = sanitizeError(error); + + // OOM error (retry on a larger machine or fail) + if (isOOMRunError(error)) { + const oomResult = await retryOOMOnMachine(prisma, runId); + if (!oomResult) { + return { outcome: "fail_run", sanitizedError, wasOOMError: true }; + } + + const delay = calculateNextRetryDelay(oomResult.retrySettings, attemptNumber ?? 1); + + if (!delay) { + //no more retries left + return { outcome: "fail_run", sanitizedError, wasOOMError: true }; + } + + return { + outcome: "retry", + method: "queue", + machine: oomResult.machine, + settings: { timestamp: Date.now() + delay, delay }, + wasOOMError: true, + }; + } + + // No retry settings + if (!retrySettings) { + return { outcome: "fail_run", sanitizedError }; + } + + // Not a retriable error: fail + const retriableError = shouldRetryError(taskRunErrorEnhancer(error)); + if (!retriableError) { + return { outcome: "fail_run", sanitizedError }; + } + + // Exceeded global max attempts + if (attemptNumber !== null && attemptNumber > MAX_TASK_RUN_ATTEMPTS) { + return { outcome: "fail_run", sanitizedError }; + } + + // Get the run settings + const run = await prisma.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + maxAttempts: true, + }, + }); + + if (!run) { + throw new ServiceValidationError("Run not found", 404); + } + + // No max attempts set + if (!run.maxAttempts) { + return { outcome: "fail_run", sanitizedError }; + } + + // No attempts left + if (attemptNumber !== null && attemptNumber >= run.maxAttempts) { + return { outcome: "fail_run", sanitizedError }; + } + + return { + outcome: "retry", + method: retryUsingQueue ? "queue" : "immediate", + settings: retrySettings, + }; +} + +async function retryOOMOnMachine( + prisma: PrismaClientOrTransaction, + runId: string +): Promise<{ machine: string; retrySettings: RetryOptions } | undefined> { + try { + const run = await prisma.taskRun.findFirst({ + where: { + id: runId, + }, + select: { + machinePreset: true, + lockedBy: { + select: { + retryConfig: true, + }, + }, + }, + }); + + if (!run || !run.lockedBy || !run.machinePreset) { + return; + } + + const retryConfig = run.lockedBy?.retryConfig; + const parsedRetryConfig = RetryOptions.nullish().safeParse(retryConfig); + + if (!parsedRetryConfig.success) { + return; + } + + if (!parsedRetryConfig.data) { + return; + } + + const retryMachine = parsedRetryConfig.data.outOfMemory?.machine; + + if (!retryMachine) { + return; + } + + if (run.machinePreset === retryMachine) { + return; + } + + return { machine: retryMachine, retrySettings: parsedRetryConfig.data }; + } catch (error) { + console.error("[FailedTaskRunRetryHelper] Failed to get execution retry", { + runId, + error, + }); + + return; + } +} diff --git a/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts new file mode 100644 index 0000000000..232d090a73 --- /dev/null +++ b/internal-packages/run-engine/src/engine/tests/attemptFailures.test.ts @@ -0,0 +1,830 @@ +import { + assertNonNullable, + containerTest, + setupAuthenticatedEnvironment, + setupBackgroundWorker, +} from "@internal/testcontainers"; +import { trace } from "@opentelemetry/api"; +import { expect } from "vitest"; +import { EventBusEventArgs } from "../eventBus.js"; +import { RunEngine } from "../index.js"; +import { setTimeout } from "node:timers/promises"; + +describe("RunEngine attempt failures", () => { + containerTest( + "Retry user error and succeed", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + const backgroundWorker = await setupBackgroundWorker( + prisma, + authenticatedEnvironment, + taskIdentifier + ); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt + const error = { + type: "BUILT_IN_ERROR" as const, + name: "UserError", + message: "This is a user error", + stackTrace: "Error: This is a user error\n at :1:1", + }; + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + expect(result.attemptStatus).toBe("RETRY_IMMEDIATELY"); + expect(result.snapshot.executionStatus).toBe("PENDING_EXECUTING"); + expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //state should be pending + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("PENDING_EXECUTING"); + //only when the new attempt is created, should the attempt be increased + expect(executionData3.run.attemptNumber).toBe(1); + expect(executionData3.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //create a second attempt + const attemptResult2 = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: executionData3.snapshot.id, + }); + expect(attemptResult2.run.attemptNumber).toBe(2); + + //now complete it successfully + const result2 = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult2.snapshot.id, + completion: { + ok: true, + id: dequeued[0].run.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + expect(result2.snapshot.executionStatus).toBe("FINISHED"); + expect(result2.run.attemptNumber).toBe(2); + expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY"); + + //waitpoint should have been completed, with the output + const runWaitpointAfter = await prisma.waitpoint.findMany({ + where: { + completedByTaskRunId: run.id, + }, + }); + expect(runWaitpointAfter.length).toBe(1); + expect(runWaitpointAfter[0].type).toBe("RUN"); + expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`); + expect(runWaitpointAfter[0].outputIsError).toBe(false); + + //state should be completed + const executionData4 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData4); + expect(executionData4.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData4.run.attemptNumber).toBe(2); + expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY"); + } finally { + engine.quit(); + } + } + ); + + containerTest("Fail (no more retries)", { timeout: 15_000 }, async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + maxAttempts: 1, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt + const error = { + type: "BUILT_IN_ERROR" as const, + name: "UserError", + message: "This is a user error", + stackTrace: "Error: This is a user error\n at :1:1", + }; + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + expect(result.attemptStatus).toBe("RUN_FINISHED"); + expect(result.snapshot.executionStatus).toBe("FINISHED"); + expect(result.run.status).toBe("COMPLETED_WITH_ERRORS"); + + //state should be pending + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); + //only when the new attempt is created, should the attempt be increased + expect(executionData3.run.attemptNumber).toBe(1); + expect(executionData3.run.status).toBe("COMPLETED_WITH_ERRORS"); + } finally { + engine.quit(); + } + }); + + containerTest( + "Fail (not a retriable error)", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + maxAttempts: 1, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt with an unretriable error + const error = { + type: "INTERNAL_ERROR" as const, + code: "DISK_SPACE_EXCEEDED" as const, + }; + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + expect(result.attemptStatus).toBe("RUN_FINISHED"); + expect(result.snapshot.executionStatus).toBe("FINISHED"); + expect(result.run.status).toBe("CRASHED"); + + //state should be pending + const executionData3 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData3); + expect(executionData3.snapshot.executionStatus).toBe("FINISHED"); + //only when the new attempt is created, should the attempt be increased + expect(executionData3.run.attemptNumber).toBe(1); + expect(executionData3.run.status).toBe("CRASHED"); + } finally { + engine.quit(); + } + } + ); + + containerTest("OOM fail", { timeout: 15_000 }, async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt with an OOM error + const error = { + type: "INTERNAL_ERROR" as const, + code: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" as const, + message: "Process exited with code -1 after signal SIGKILL.", + stackTrace: "JavaScript heap out of memory", + }; + + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + }, + }); + + // The run should be retried with a larger machine + expect(result.attemptStatus).toBe("RUN_FINISHED"); + expect(result.snapshot.executionStatus).toBe("FINISHED"); + expect(result.run.status).toBe("CRASHED"); + + //state should be pending + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData.run.attemptNumber).toBe(1); + expect(executionData.run.status).toBe("CRASHED"); + } finally { + engine.quit(); + } + }); + + containerTest( + "OOM retry on larger machine", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + "small-2x": { + name: "small-2x" as const, + cpu: 1, + memory: 1, + centsPerMs: 0.0002, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + outOfMemory: { + machine: "small-2x", + }, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create an attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the attempt with an OOM error + const error = { + type: "INTERNAL_ERROR" as const, + code: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" as const, + message: "Process exited with code -1 after signal SIGKILL.", + stackTrace: "JavaScript heap out of memory", + }; + + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + }, + }); + + // The run should be retried with a larger machine + expect(result.attemptStatus).toBe("RETRY_QUEUED"); + expect(result.snapshot.executionStatus).toBe("QUEUED"); + expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //state should be pending + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("QUEUED"); + expect(executionData.run.attemptNumber).toBe(1); + expect(executionData.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //create a second attempt + const attemptResult2 = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: executionData.snapshot.id, + }); + expect(attemptResult2.run.attemptNumber).toBe(2); + + //now complete it successfully + const result2 = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult2.snapshot.id, + completion: { + ok: true, + id: dequeued[0].run.id, + output: `{"foo":"bar"}`, + outputType: "application/json", + }, + }); + expect(result2.snapshot.executionStatus).toBe("FINISHED"); + expect(result2.run.attemptNumber).toBe(2); + expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY"); + + //waitpoint should have been completed, with the output + const runWaitpointAfter = await prisma.waitpoint.findMany({ + where: { + completedByTaskRunId: run.id, + }, + }); + expect(runWaitpointAfter.length).toBe(1); + expect(runWaitpointAfter[0].type).toBe("RUN"); + expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`); + expect(runWaitpointAfter[0].outputIsError).toBe(false); + + //state should be completed + const executionData4 = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData4); + expect(executionData4.snapshot.executionStatus).toBe("FINISHED"); + expect(executionData4.run.attemptNumber).toBe(2); + expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY"); + } finally { + engine.quit(); + } + } + ); + + containerTest( + "OOM fails after retrying on larger machine", + { timeout: 15_000 }, + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + "small-2x": { + name: "small-2x" as const, + cpu: 1, + memory: 1, + centsPerMs: 0.0002, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + //create background worker + await setupBackgroundWorker(prisma, authenticatedEnvironment, taskIdentifier, undefined, { + maxTimeoutInMs: 10, + maxAttempts: 10, + outOfMemory: { + machine: "small-2x", + }, + }); + + //trigger the run + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_1234", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + masterQueue: "main", + queueName: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + //dequeue the run + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + + //create first attempt + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + //fail the first attempt with an OOM error + const error = { + type: "INTERNAL_ERROR" as const, + code: "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" as const, + message: "Process exited with code -1 after signal SIGKILL.", + stackTrace: "JavaScript heap out of memory", + }; + + const result = await engine.completeRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: attemptResult.snapshot.id, + completion: { + ok: false, + id: dequeued[0].run.id, + error, + }, + }); + + // The run should be retried with a larger machine + expect(result.attemptStatus).toBe("RETRY_QUEUED"); + expect(result.snapshot.executionStatus).toBe("QUEUED"); + expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //state should be queued + const executionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(executionData); + expect(executionData.snapshot.executionStatus).toBe("QUEUED"); + expect(executionData.run.attemptNumber).toBe(1); + expect(executionData.run.status).toBe("RETRYING_AFTER_FAILURE"); + + //wait for 1s + await setTimeout(1_000); + + //dequeue again + const dequeued2 = await engine.dequeueFromMasterQueue({ + consumerId: "test_12345", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + expect(dequeued2.length).toBe(1); + + //create second attempt + const attemptResult2 = await engine.startRunAttempt({ + runId: dequeued2[0].run.id, + snapshotId: dequeued2[0].snapshot.id, + }); + expect(attemptResult2.run.attemptNumber).toBe(2); + + //fail the second attempt with the same OOM error + const result2 = await engine.completeRunAttempt({ + runId: dequeued2[0].run.id, + snapshotId: attemptResult2.snapshot.id, + completion: { + ok: false, + id: dequeued2[0].run.id, + error, + retry: { + timestamp: Date.now(), + delay: 0, + }, + }, + }); + + // The run should fail after the second OOM + expect(result2.attemptStatus).toBe("RUN_FINISHED"); + expect(result2.snapshot.executionStatus).toBe("FINISHED"); + expect(result2.run.status).toBe("CRASHED"); + + //final state should be crashed + const finalExecutionData = await engine.getRunExecutionData({ runId: run.id }); + assertNonNullable(finalExecutionData); + expect(finalExecutionData.snapshot.executionStatus).toBe("FINISHED"); + expect(finalExecutionData.run.attemptNumber).toBe(2); + expect(finalExecutionData.run.status).toBe("CRASHED"); + } finally { + engine.quit(); + } + } + ); +}); diff --git a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts index 6639253654..d2869886d0 100644 --- a/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts +++ b/internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts @@ -5,7 +5,7 @@ import { setupBackgroundWorker, } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; -import { expect } from "vitest"; +import { expect, describe } from "vitest"; import { RunEngine } from "../index.js"; import { setTimeout } from "node:timers/promises"; import { generateFriendlyId } from "@trigger.dev/core/v3/apps"; diff --git a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts index 1ff229cd7b..b606845490 100644 --- a/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts +++ b/internal-packages/run-engine/src/engine/tests/heartbeats.test.ts @@ -5,7 +5,7 @@ import { assertNonNullable, } from "@internal/testcontainers"; import { trace } from "@opentelemetry/api"; -import { expect } from "vitest"; +import { expect, describe } from "vitest"; import { RunEngine } from "../index.js"; import { setTimeout } from "timers/promises"; diff --git a/internal-packages/run-engine/src/engine/tests/trigger.test.ts b/internal-packages/run-engine/src/engine/tests/trigger.test.ts index 69dbb66b08..1eef5dd838 100644 --- a/internal-packages/run-engine/src/engine/tests/trigger.test.ts +++ b/internal-packages/run-engine/src/engine/tests/trigger.test.ts @@ -330,161 +330,4 @@ describe("RunEngine trigger()", () => { engine.quit(); } }); - - containerTest( - "Single run (retry attempt, then succeed)", - { timeout: 15_000 }, - async ({ prisma, redisOptions }) => { - //create environment - const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); - - const engine = new RunEngine({ - prisma, - worker: { - redis: redisOptions, - workers: 1, - tasksPerWorker: 10, - pollIntervalMs: 100, - }, - queue: { - redis: redisOptions, - }, - runLock: { - redis: redisOptions, - }, - machines: { - defaultMachine: "small-1x", - machines: { - "small-1x": { - name: "small-1x" as const, - cpu: 0.5, - memory: 0.5, - centsPerMs: 0.0001, - }, - }, - baseCostInCents: 0.0001, - }, - tracer: trace.getTracer("test", "0.0.0"), - }); - - try { - const taskIdentifier = "test-task"; - - //create background worker - const backgroundWorker = await setupBackgroundWorker( - prisma, - authenticatedEnvironment, - taskIdentifier - ); - - //trigger the run - const run = await engine.trigger( - { - number: 1, - friendlyId: "run_1234", - environment: authenticatedEnvironment, - taskIdentifier, - payload: "{}", - payloadType: "application/json", - context: {}, - traceContext: {}, - traceId: "t12345", - spanId: "s12345", - masterQueue: "main", - queueName: "task/test-task", - isTest: false, - tags: [], - }, - prisma - ); - - //dequeue the run - const dequeued = await engine.dequeueFromMasterQueue({ - consumerId: "test_12345", - masterQueue: run.masterQueue, - maxRunCount: 10, - }); - - //create an attempt - const attemptResult = await engine.startRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: dequeued[0].snapshot.id, - }); - - //fail the attempt - const error = { - type: "BUILT_IN_ERROR" as const, - name: "UserError", - message: "This is a user error", - stackTrace: "Error: This is a user error\n at :1:1", - }; - const result = await engine.completeRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: attemptResult.snapshot.id, - completion: { - ok: false, - id: dequeued[0].run.id, - error, - retry: { - timestamp: Date.now(), - delay: 0, - }, - }, - }); - expect(result.attemptStatus).toBe("RETRY_IMMEDIATELY"); - expect(result.snapshot.executionStatus).toBe("PENDING_EXECUTING"); - expect(result.run.status).toBe("RETRYING_AFTER_FAILURE"); - - //state should be completed - const executionData3 = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData3); - expect(executionData3.snapshot.executionStatus).toBe("PENDING_EXECUTING"); - //only when the new attempt is created, should the attempt be increased - expect(executionData3.run.attemptNumber).toBe(1); - expect(executionData3.run.status).toBe("RETRYING_AFTER_FAILURE"); - - //create a second attempt - const attemptResult2 = await engine.startRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: executionData3.snapshot.id, - }); - expect(attemptResult2.run.attemptNumber).toBe(2); - - //now complete it successfully - const result2 = await engine.completeRunAttempt({ - runId: dequeued[0].run.id, - snapshotId: attemptResult2.snapshot.id, - completion: { - ok: true, - id: dequeued[0].run.id, - output: `{"foo":"bar"}`, - outputType: "application/json", - }, - }); - expect(result2.snapshot.executionStatus).toBe("FINISHED"); - expect(result2.run.attemptNumber).toBe(2); - expect(result2.run.status).toBe("COMPLETED_SUCCESSFULLY"); - - //waitpoint should have been completed, with the output - const runWaitpointAfter = await prisma.waitpoint.findMany({ - where: { - completedByTaskRunId: run.id, - }, - }); - expect(runWaitpointAfter.length).toBe(1); - expect(runWaitpointAfter[0].type).toBe("RUN"); - expect(runWaitpointAfter[0].output).toBe(`{"foo":"bar"}`); - expect(runWaitpointAfter[0].outputIsError).toBe(false); - - //state should be completed - const executionData4 = await engine.getRunExecutionData({ runId: run.id }); - assertNonNullable(executionData4); - expect(executionData4.snapshot.executionStatus).toBe("FINISHED"); - expect(executionData4.run.attemptNumber).toBe(2); - expect(executionData4.run.status).toBe("COMPLETED_SUCCESSFULLY"); - } finally { - engine.quit(); - } - } - ); }); diff --git a/internal-packages/testcontainers/src/setup.ts b/internal-packages/testcontainers/src/setup.ts index f79f2f1ef7..ffd9d86e1b 100644 --- a/internal-packages/testcontainers/src/setup.ts +++ b/internal-packages/testcontainers/src/setup.ts @@ -68,7 +68,8 @@ export async function setupBackgroundWorker( prisma: PrismaClient, environment: AuthenticatedEnvironment, taskIdentifier: string | string[], - machineConfig?: MachineConfig + machineConfig?: MachineConfig, + retryOptions?: RetryOptions ) { const worker = await prisma.backgroundWorker.create({ data: { @@ -86,7 +87,7 @@ export async function setupBackgroundWorker( const tasks: BackgroundWorkerTask[] = []; for (const identifier of taskIdentifiers) { - const retryConfig: RetryOptions = { + const retryConfig: RetryOptions = retryOptions ?? { maxAttempts: 3, factor: 1, minTimeoutInMs: 100, diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index 59643b5db2..052da5bce9 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -19,6 +19,7 @@ import { ChildProcess, fork } from "node:child_process"; import { chalkError, chalkGrey, chalkRun, prettyPrintDate } from "../utilities/cliOutput.js"; import { execOptionsForRuntime, execPathForRuntime } from "@trigger.dev/core/v3/build"; +import { nodeOptionsWithMaxOldSpaceSize } from "@trigger.dev/core/v3/machines"; import { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket"; import { logger } from "../utilities/logger.js"; import { @@ -117,14 +118,16 @@ export class TaskRunProcess { } async initialize() { - const { env: $env, workerManifest, cwd, messageId } = this.options; + const { env: $env, workerManifest, cwd, messageId, payload } = this.options; + + const maxOldSpaceSize = nodeOptionsWithMaxOldSpaceSize(undefined, payload.execution.machine); const fullEnv = { ...(this.isTest ? { TRIGGER_LOG_LEVEL: "debug" } : {}), ...$env, OTEL_IMPORT_HOOK_INCLUDES: workerManifest.otelImportHook?.include?.join(","), // TODO: this will probably need to use something different for bun (maybe --preload?) - NODE_OPTIONS: execOptionsForRuntime(workerManifest.runtime, workerManifest), + NODE_OPTIONS: execOptionsForRuntime(workerManifest.runtime, workerManifest, maxOldSpaceSize), PATH: process.env.PATH, TRIGGER_PROCESS_FORK_START_TIME: String(Date.now()), }; diff --git a/packages/core/package.json b/packages/core/package.json index 31900084f8..d3f2d38f2a 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -59,6 +59,7 @@ "./v3/workers": "./src/v3/workers/index.ts", "./v3/schemas": "./src/v3/schemas/index.ts", "./v3/runEngineWorker": "./src/v3/runEngineWorker/index.ts", + "./v3/machines": "./src/v3/machines/index.ts", "./v3/serverOnly": "./src/v3/serverOnly/index.ts" }, "sourceDialects": [ @@ -178,6 +179,9 @@ "v3/runEngineWorker": [ "dist/commonjs/v3/runEngineWorker/index.d.ts" ], + "v3/machines": [ + "dist/commonjs/v3/machines/index.d.ts" + ], "v3/serverOnly": [ "dist/commonjs/v3/serverOnly/index.d.ts" ] @@ -663,6 +667,17 @@ "default": "./dist/commonjs/v3/runEngineWorker/index.js" } }, + "./v3/machines": { + "import": { + "@triggerdotdev/source": "./src/v3/machines/index.ts", + "types": "./dist/esm/v3/machines/index.d.ts", + "default": "./dist/esm/v3/machines/index.js" + }, + "require": { + "types": "./dist/commonjs/v3/machines/index.d.ts", + "default": "./dist/commonjs/v3/machines/index.js" + } + }, "./v3/serverOnly": { "import": { "@triggerdotdev/source": "./src/v3/serverOnly/index.ts", diff --git a/packages/core/src/v3/build/flags.test.ts b/packages/core/src/v3/build/flags.test.ts new file mode 100644 index 0000000000..aaa3149bea --- /dev/null +++ b/packages/core/src/v3/build/flags.test.ts @@ -0,0 +1,53 @@ +import { describe, it, expect } from "vitest"; +import { dedupFlags } from "./flags.js"; + +describe("dedupFlags", () => { + it("should keep single flags unchanged", () => { + expect(dedupFlags("--verbose")).toBe("--verbose"); + expect(dedupFlags("-v")).toBe("-v"); + expect(dedupFlags("--debug=true")).toBe("--debug=true"); + }); + + it("should preserve multiple different flags", () => { + expect(dedupFlags("--quiet --verbose")).toBe("--quiet --verbose"); + expect(dedupFlags("-v -q --log=debug")).toBe("-v -q --log=debug"); + }); + + it("should use last value when flags are duplicated", () => { + expect(dedupFlags("--debug=false --debug=true")).toBe("--debug=true"); + expect(dedupFlags("--log=info --log=warn --log=error")).toBe("--log=error"); + }); + + it("should handle mix of flags with and without values", () => { + expect(dedupFlags("--debug=false -v --debug=true")).toBe("-v --debug=true"); + expect(dedupFlags("-v --quiet -v")).toBe("--quiet -v"); + }); + + // Edge cases + it("should handle empty string", () => { + expect(dedupFlags("")).toBe(""); + }); + + it("should handle multiple spaces between flags", () => { + expect(dedupFlags("--debug=false --verbose --debug=true")).toBe("--verbose --debug=true"); + }); + + it("should handle flags with empty values", () => { + expect(dedupFlags("--path= --path=foo")).toBe("--path=foo"); + expect(dedupFlags("--path=foo --path=")).toBe("--path="); + }); + + it("should preserve values containing equals signs", () => { + expect(dedupFlags("--url=http://example.com?foo=bar")).toBe("--url=http://example.com?foo=bar"); + }); + + it("should handle flags with special characters", () => { + expect(dedupFlags("--path=/usr/local --path=/home/user")).toBe("--path=/home/user"); + expect(dedupFlags('--name="John Doe" --name="Jane Doe"')).toBe('--name="Jane Doe"'); + }); + + it("should handle multiple flag variants", () => { + const input = "--env=dev -v --env=prod --quiet -v --env=staging"; + expect(dedupFlags(input)).toBe("--quiet -v --env=staging"); + }); +}); diff --git a/packages/core/src/v3/build/flags.ts b/packages/core/src/v3/build/flags.ts new file mode 100644 index 0000000000..88448fa730 --- /dev/null +++ b/packages/core/src/v3/build/flags.ts @@ -0,0 +1,47 @@ +/** + * Deduplicates command line flags by keeping only the last occurrence of each flag. + * Preserves the order of the last occurrence of each flag. + * + * @param flags - A space-separated string of command line flags + * @returns A space-separated string of deduplicated flags + * + * @example + * // Single flags are preserved + * dedupFlags("--quiet --verbose") // returns "--quiet --verbose" + * + * @example + * // For duplicate flags, the last value wins and maintains its position + * dedupFlags("--debug=false --log=info --debug=true") // returns "--log=info --debug=true" + * + * @example + * // Mixing flags with and without values + * dedupFlags("-v --log=debug -v") // returns "--log=debug -v" + */ +export function dedupFlags(flags: string): string { + const seen = new Set(); + const result: [string, string | boolean][] = []; + + const pairs = flags + .split(" ") + .filter(Boolean) // Remove empty strings from multiple spaces + .map((flag): [string, string | boolean] => { + const equalIndex = flag.indexOf("="); + if (equalIndex !== -1) { + const key = flag.slice(0, equalIndex); + const value = flag.slice(equalIndex + 1); + return [key, value]; + } else { + return [flag, true]; + } + }); + + // Process in reverse to keep last occurrence + for (const [key, value] of pairs.reverse()) { + if (!seen.has(key)) { + seen.add(key); + result.unshift([key, value]); + } + } + + return result.map(([key, value]) => (value === true ? key : `${key}=${value}`)).join(" "); +} diff --git a/packages/core/src/v3/build/index.ts b/packages/core/src/v3/build/index.ts index 38245d4798..c29a82da2e 100644 --- a/packages/core/src/v3/build/index.ts +++ b/packages/core/src/v3/build/index.ts @@ -2,3 +2,4 @@ export * from "./extensions.js"; export * from "./resolvedConfig.js"; export * from "./runtime.js"; export * from "./externals.js"; +export * from "./flags.js"; diff --git a/packages/core/src/v3/build/runtime.ts b/packages/core/src/v3/build/runtime.ts index 94b3fac08c..dc8ce19c6c 100644 --- a/packages/core/src/v3/build/runtime.ts +++ b/packages/core/src/v3/build/runtime.ts @@ -1,6 +1,7 @@ import { join } from "node:path"; import { pathToFileURL } from "url"; import { BuildRuntime } from "../schemas/build.js"; +import { dedupFlags } from "./flags.js"; export const DEFAULT_RUNTIME = "node" satisfies BuildRuntime; @@ -41,7 +42,11 @@ export type ExecOptions = { customConditions?: string[]; }; -export function execOptionsForRuntime(runtime: BuildRuntime, options: ExecOptions): string { +export function execOptionsForRuntime( + runtime: BuildRuntime, + options: ExecOptions, + additionalNodeOptions?: string +): string { switch (runtime) { case "node": case "node-22": { @@ -51,15 +56,19 @@ export function execOptionsForRuntime(runtime: BuildRuntime, options: ExecOption const conditions = options.customConditions?.map((condition) => `--conditions=${condition}`); - return [ + //later flags will win (after the dedupe) + const flags = [ + process.env.NODE_OPTIONS, + additionalNodeOptions, importEntryPoint, conditions, - process.env.NODE_OPTIONS, nodeRuntimeNeedsGlobalWebCryptoFlag() ? "--experimental-global-webcrypto" : undefined, ] .filter(Boolean) .flat() .join(" "); + + return dedupFlags(flags); } case "bun": { return ""; diff --git a/packages/core/src/v3/errors.ts b/packages/core/src/v3/errors.ts index 88c089025f..a8e789f81d 100644 --- a/packages/core/src/v3/errors.ts +++ b/packages/core/src/v3/errors.ts @@ -76,6 +76,58 @@ export function isManualOutOfMemoryError(error: TaskRunError) { return false; } +export function isOOMRunError(error: TaskRunError) { + if (error.type === "INTERNAL_ERROR") { + if ( + error.code === "TASK_PROCESS_OOM_KILLED" || + error.code === "TASK_PROCESS_MAYBE_OOM_KILLED" + ) { + return true; + } + + // For the purposes of retrying on a larger machine, we're going to treat this is an OOM error. + // This is what they look like if we're executing using k8s. They then get corrected later, but it's too late. + // {"code": "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE", "type": "INTERNAL_ERROR", "message": "Process exited with code -1 after signal SIGKILL."} + if ( + error.code === "TASK_PROCESS_EXITED_WITH_NON_ZERO_CODE" && + error.message && + error.message.includes("-1") + ) { + if (error.message.includes("SIGKILL")) { + return true; + } + + if (error.message.includes("SIGABRT") && error.stackTrace) { + const oomIndicators = [ + "JavaScript heap out of memory", + "Reached heap limit", + "FATAL ERROR: Reached heap limit Allocation failed", + ]; + + if (oomIndicators.some((indicator) => error.stackTrace!.includes(indicator))) { + return true; + } + } + } + } + + if (error.type === "BUILT_IN_ERROR") { + // ffmpeg also does weird stuff + // { "name": "Error", "type": "BUILT_IN_ERROR", "message": "ffmpeg was killed with signal SIGKILL" } + if (error.message && error.message.includes("ffmpeg was killed with signal SIGKILL")) { + return true; + } + } + + // Special `OutOfMemoryError` for doing a manual OOM kill. + // Useful if a native library does an OOM but doesn't actually crash the run and you want to manually + if (isManualOutOfMemoryError(error)) { + return true; + } + + return false; +} + export class TaskPayloadParsedError extends Error { public readonly cause: unknown; @@ -562,6 +614,8 @@ const findSignalInMessage = (message?: string, truncateLength = 100) => { return "SIGSEGV"; } else if (trunc.includes("SIGKILL")) { return "SIGKILL"; + } else if (trunc.includes("SIGABRT")) { + return "SIGABRT"; } else { return; } @@ -587,6 +641,10 @@ export function taskRunErrorEnhancer(error: TaskRunError): EnhanceError { + const testMachine: MachinePreset = { + name: "small-2x", + memory: 1, // 1GB = 1024 MiB + cpu: 1, + centsPerMs: 0, + }; + + // With default 0.2 overhead, max-old-space-size should be 819 (1024 * 0.8) + const expectedFlag = "--max-old-space-size=819"; + + it("handles undefined NODE_OPTIONS", () => { + const result = nodeOptionsWithMaxOldSpaceSize(undefined, testMachine); + expect(result).toBe(expectedFlag); + }); + + it("handles empty string NODE_OPTIONS", () => { + const result = nodeOptionsWithMaxOldSpaceSize("", testMachine); + expect(result).toBe(expectedFlag); + }); + + it("preserves existing flags while adding max-old-space-size", () => { + const result = nodeOptionsWithMaxOldSpaceSize("--inspect --trace-warnings", testMachine); + expect(result).toBe(`--inspect --trace-warnings ${expectedFlag}`); + }); + + it("replaces existing max-old-space-size flag", () => { + const result = nodeOptionsWithMaxOldSpaceSize( + "--max-old-space-size=4096 --inspect", + testMachine + ); + expect(result).toBe(`--inspect ${expectedFlag}`); + }); + + it("handles multiple existing max-old-space-size flags", () => { + const result = nodeOptionsWithMaxOldSpaceSize( + "--max-old-space-size=4096 --inspect --max-old-space-size=8192", + testMachine + ); + expect(result).toBe(`--inspect ${expectedFlag}`); + }); + + it("handles extra spaces between flags", () => { + const result = nodeOptionsWithMaxOldSpaceSize("--inspect --trace-warnings", testMachine); + expect(result).toBe(`--inspect --trace-warnings ${expectedFlag}`); + }); + + it("uses custom overhead value", () => { + const result = nodeOptionsWithMaxOldSpaceSize("--inspect", testMachine, 0.5); + // With 0.5 overhead, max-old-space-size should be 512 (1024 * 0.5) + expect(result).toBe("--inspect --max-old-space-size=512"); + }); +}); diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index cf82719f76..fbfd4e97e7 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -308,7 +308,7 @@ export const TaskRunExecution = z.object({ organization: TaskRunExecutionOrganization, project: TaskRunExecutionProject, batch: TaskRunExecutionBatch.optional(), - machine: MachinePreset.optional(), + machine: MachinePreset, }); export type TaskRunExecution = z.infer; diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts index 758c27056e..8db9c42ae0 100644 --- a/packages/core/vitest.config.ts +++ b/packages/core/vitest.config.ts @@ -2,7 +2,7 @@ import { defineConfig } from "vitest/config"; export default defineConfig({ test: { - include: ["test/**/*.test.ts"], + include: ["test/**/*.test.ts", "src/v3/**/*.test.ts"], globals: true, }, });