diff --git a/.changeset/plenty-dolphins-act.md b/.changeset/plenty-dolphins-act.md new file mode 100644 index 0000000000..59d2c7fc44 --- /dev/null +++ b/.changeset/plenty-dolphins-act.md @@ -0,0 +1,8 @@ +--- +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +- Correctly resolve waitpoints that come in early +- Ensure correct state before requesting suspension +- Fix race conditions in snapshot processing diff --git a/.changeset/sweet-dolphins-invent.md b/.changeset/sweet-dolphins-invent.md new file mode 100644 index 0000000000..df758a89e9 --- /dev/null +++ b/.changeset/sweet-dolphins-invent.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Always print full deploy logs in CI diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 2dcf329736..cc41e2bfbf 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -17,6 +17,7 @@ import { WorkloadRunAttemptStartRequestBody, type WorkloadRunAttemptStartResponseBody, type WorkloadRunLatestSnapshotResponseBody, + WorkloadRunSnapshotsSinceResponseBody, type WorkloadServerToClientEvents, type WorkloadSuspendRunResponseBody, } from "@trigger.dev/core/v3/workers"; @@ -341,6 +342,31 @@ export class WorkloadServer extends EventEmitter { } satisfies WorkloadRunLatestSnapshotResponseBody); }, }) + .route( + "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/since/:snapshotFriendlyId", + "GET", + { + paramsSchema: WorkloadActionParams, + handler: async ({ req, reply, params }) => { + const sinceSnapshotResponse = await this.workerClient.getSnapshotsSince( + params.runFriendlyId, + params.snapshotFriendlyId, + this.runnerIdFromRequest(req) + ); + + if (!sinceSnapshotResponse.success) { + console.error("Failed to get snapshots since", { + runId: params.runFriendlyId, + error: sinceSnapshotResponse.error, + }); + reply.empty(500); + return; + } + + reply.json(sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody); + }, + } + ) .route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", { paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }), bodySchema: WorkloadDebugLogRequestBody, diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.since.$snapshotId.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.since.$snapshotId.ts new file mode 100644 index 0000000000..a79de5869a --- /dev/null +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.since.$snapshotId.ts @@ -0,0 +1,30 @@ +import { json, TypedResponse } from "@remix-run/server-runtime"; +import { WorkerApiRunSnapshotsSinceResponseBody } from "@trigger.dev/core/v3/workers"; +import { z } from "zod"; +import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server"; + +export const loader = createLoaderWorkerApiRoute( + { + params: z.object({ + runFriendlyId: z.string(), + snapshotId: z.string(), + }), + }, + async ({ + authenticatedWorker, + params, + }): Promise> => { + const { runFriendlyId, snapshotId } = params; + + const snapshots = await authenticatedWorker.getSnapshotsSince({ + runFriendlyId, + snapshotId, + }); + + if (!snapshots) { + throw new Error("Failed to retrieve snapshots since given snapshot"); + } + + return json({ snapshots }); + } +); diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index 1a71157289..2767e613db 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -401,7 +401,7 @@ export function registerRunEngineEventBusHandlers() { engine.eventBus.on("executionSnapshotCreated", async ({ time, run, snapshot }) => { const eventResult = await recordRunDebugLog( run.id, - `${snapshot.executionStatus} - ${snapshot.description}`, + `[engine] ${snapshot.executionStatus} - ${snapshot.description}`, { attributes: { properties: { @@ -450,6 +450,7 @@ export function registerRunEngineEventBusHandlers() { // Record notification event const eventResult = await recordRunDebugLog( run.id, + // don't prefix this with [engine] - "run:notify" is the correct prefix `run:notify platform -> supervisor: ${snapshot.executionStatus}`, { attributes: { @@ -479,6 +480,7 @@ export function registerRunEngineEventBusHandlers() { // Record notification event const eventResult = await recordRunDebugLog( run.id, + // don't prefix this with [engine] - "run:notify" is the correct prefix `run:notify ERROR platform -> supervisor: ${snapshot.executionStatus}`, { attributes: { @@ -505,7 +507,7 @@ export function registerRunEngineEventBusHandlers() { engine.eventBus.on("incomingCheckpointDiscarded", async ({ time, run, snapshot, checkpoint }) => { const eventResult = await recordRunDebugLog( run.id, - `Checkpoint discarded: ${checkpoint.discardReason}`, + `[engine] Checkpoint discarded: ${checkpoint.discardReason}`, { attributes: { properties: { diff --git a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts index d7a8a10d7d..5c3a8d1cce 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts @@ -759,6 +759,19 @@ export class AuthenticatedWorkerInstance extends WithRunEngine { }); } + async getSnapshotsSince({ + runFriendlyId, + snapshotId, + }: { + runFriendlyId: string; + snapshotId: string; + }) { + return await this._engine.getSnapshotsSince({ + runId: fromFriendlyId(runFriendlyId), + snapshotId: fromFriendlyId(snapshotId), + }); + } + toJSON(): WorkerGroupTokenAuthenticationResponse { if (this.type === WorkerInstanceGroupType.MANAGED) { return { diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index f1102a195c..9361a4d366 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -43,6 +43,8 @@ import { EnqueueSystem } from "./systems/enqueueSystem.js"; import { ExecutionSnapshotSystem, getLatestExecutionSnapshot, + getExecutionSnapshotsSince, + executionDataFromSnapshot, } from "./systems/executionSnapshotSystem.js"; import { PendingVersionSystem } from "./systems/pendingVersionSystem.js"; import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js"; @@ -1100,43 +1102,31 @@ export class RunEngine { const prisma = tx ?? this.prisma; try { const snapshot = await getLatestExecutionSnapshot(prisma, runId); + return executionDataFromSnapshot(snapshot); + } catch (e) { + this.logger.error("Failed to getRunExecutionData", { + message: e instanceof Error ? e.message : e, + }); + return null; + } + } - const executionData: RunExecutionData = { - version: "1" as const, - snapshot: { - id: snapshot.id, - friendlyId: snapshot.friendlyId, - executionStatus: snapshot.executionStatus, - description: snapshot.description, - }, - run: { - id: snapshot.runId, - friendlyId: snapshot.runFriendlyId, - status: snapshot.runStatus, - attemptNumber: snapshot.attemptNumber ?? undefined, - }, - batch: snapshot.batchId - ? { - id: snapshot.batchId, - friendlyId: BatchId.toFriendlyId(snapshot.batchId), - } - : undefined, - checkpoint: snapshot.checkpoint - ? { - id: snapshot.checkpoint.id, - friendlyId: snapshot.checkpoint.friendlyId, - type: snapshot.checkpoint.type, - location: snapshot.checkpoint.location, - imageRef: snapshot.checkpoint.imageRef, - reason: snapshot.checkpoint.reason ?? undefined, - } - : undefined, - completedWaitpoints: snapshot.completedWaitpoints, - }; + async getSnapshotsSince({ + runId, + snapshotId, + tx, + }: { + runId: string; + snapshotId: string; + tx?: PrismaClientOrTransaction; + }): Promise { + const prisma = tx ?? this.prisma; - return executionData; + try { + const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId); + return snapshots.map(executionDataFromSnapshot); } catch (e) { - this.logger.error("Failed to getRunExecutionData", { + this.logger.error("Failed to getSnapshotsSince", { message: e instanceof Error ? e.message : e, }); return null; @@ -1158,9 +1148,6 @@ export class RunEngine { } } - //#endregion - - //#region Heartbeat async #handleStalledSnapshot({ runId, snapshotId, diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 9610c1accc..0e5800fa9a 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -409,6 +409,7 @@ export class DequeueSystem { friendlyId: newSnapshot.friendlyId, executionStatus: newSnapshot.executionStatus, description: newSnapshot.description, + createdAt: newSnapshot.createdAt, }, image: result.deployment?.imageReference ?? undefined, checkpoint: newSnapshot.checkpoint ?? undefined, diff --git a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts index 25320697b0..314fb23f8e 100644 --- a/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts @@ -1,4 +1,4 @@ -import { CompletedWaitpoint, ExecutionResult } from "@trigger.dev/core/v3"; +import { CompletedWaitpoint, ExecutionResult, RunExecutionData } from "@trigger.dev/core/v3"; import { BatchId, RunId, SnapshotId } from "@trigger.dev/core/v3/isomorphic"; import { Prisma, @@ -17,31 +17,23 @@ export type ExecutionSnapshotSystemOptions = { heartbeatTimeouts: HeartbeatTimeouts; }; -export interface LatestExecutionSnapshot extends TaskRunExecutionSnapshot { +export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot { friendlyId: string; runFriendlyId: string; checkpoint: TaskRunCheckpoint | null; completedWaitpoints: CompletedWaitpoint[]; } -/* Gets the most recent valid snapshot for a run */ -export async function getLatestExecutionSnapshot( - prisma: PrismaClientOrTransaction, - runId: string -): Promise { - const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({ - where: { runId, isValid: true }, - include: { - completedWaitpoints: true, - checkpoint: true, - }, - orderBy: { createdAt: "desc" }, - }); - - if (!snapshot) { - throw new Error(`No execution snapshot found for TaskRun ${runId}`); - } +type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{ + include: { + checkpoint: true; + completedWaitpoints: true; + }; +}>; +function enhanceExecutionSnapshot( + snapshot: ExecutionSnapshotWithCheckAndWaitpoints +): EnhancedExecutionSnapshot { return { ...snapshot, friendlyId: SnapshotId.toFriendlyId(snapshot.id), @@ -99,6 +91,27 @@ export async function getLatestExecutionSnapshot( }; } +/* Gets the most recent valid snapshot for a run */ +export async function getLatestExecutionSnapshot( + prisma: PrismaClientOrTransaction, + runId: string +): Promise { + const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({ + where: { runId, isValid: true }, + include: { + completedWaitpoints: true, + checkpoint: true, + }, + orderBy: { createdAt: "desc" }, + }); + + if (!snapshot) { + throw new Error(`No execution snapshot found for TaskRun ${runId}`); + } + + return enhanceExecutionSnapshot(snapshot); +} + export async function getExecutionSnapshotCompletedWaitpoints( prisma: PrismaClientOrTransaction, snapshotId: string @@ -131,6 +144,7 @@ export function executionResultFromSnapshot(snapshot: TaskRunExecutionSnapshot): friendlyId: SnapshotId.toFriendlyId(snapshot.id), executionStatus: snapshot.executionStatus, description: snapshot.description, + createdAt: snapshot.createdAt, }, run: { id: snapshot.runId, @@ -141,6 +155,73 @@ export function executionResultFromSnapshot(snapshot: TaskRunExecutionSnapshot): }; } +export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): RunExecutionData { + return { + version: "1" as const, + snapshot: { + id: snapshot.id, + friendlyId: snapshot.friendlyId, + executionStatus: snapshot.executionStatus, + description: snapshot.description, + createdAt: snapshot.createdAt, + }, + run: { + id: snapshot.runId, + friendlyId: snapshot.runFriendlyId, + status: snapshot.runStatus, + attemptNumber: snapshot.attemptNumber ?? undefined, + }, + batch: snapshot.batchId + ? { + id: snapshot.batchId, + friendlyId: BatchId.toFriendlyId(snapshot.batchId), + } + : undefined, + checkpoint: snapshot.checkpoint + ? { + id: snapshot.checkpoint.id, + friendlyId: snapshot.checkpoint.friendlyId, + type: snapshot.checkpoint.type, + location: snapshot.checkpoint.location, + imageRef: snapshot.checkpoint.imageRef, + reason: snapshot.checkpoint.reason ?? undefined, + } + : undefined, + completedWaitpoints: snapshot.completedWaitpoints, + }; +} + +export async function getExecutionSnapshotsSince( + prisma: PrismaClientOrTransaction, + runId: string, + sinceSnapshotId: string +): Promise { + // Find the createdAt of the sinceSnapshotId + const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({ + where: { id: sinceSnapshotId }, + select: { createdAt: true }, + }); + + if (!sinceSnapshot) { + throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`); + } + + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { + runId, + isValid: true, + createdAt: { gt: sinceSnapshot.createdAt }, + }, + include: { + completedWaitpoints: true, + checkpoint: true, + }, + orderBy: { createdAt: "asc" }, + }); + + return snapshots.map(enhanceExecutionSnapshot); +} + export class ExecutionSnapshotSystem { private readonly $: SystemResources; private readonly heartbeatTimeouts: HeartbeatTimeouts; diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 0ee1af5576..2150f70ac9 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -892,6 +892,7 @@ export class RunAttemptSystem { friendlyId: newSnapshot.friendlyId, executionStatus: newSnapshot.executionStatus, description: newSnapshot.description, + createdAt: newSnapshot.createdAt, }, run: { id: newSnapshot.runId, diff --git a/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts b/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts index 3e4ae20afa..4beadd6a74 100644 --- a/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts +++ b/internal-packages/run-engine/src/engine/tests/waitpoints.test.ts @@ -1344,4 +1344,148 @@ describe("RunEngine Waitpoints", () => { } } ); + + containerTest( + "getSnapshotsSince returns correct snapshots and handles errors", + async ({ prisma, redisOptions }) => { + //create environment + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + const run = await engine.trigger( + { + number: 1, + friendlyId: "run_snapshotsince", + environment: authenticatedEnvironment, + taskIdentifier, + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t_snapshotsince", + spanId: "s_snapshotsince", + masterQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + }, + prisma + ); + + // Dequeue and start the run (snapshot 1) + const dequeued = await engine.dequeueFromMasterQueue({ + consumerId: "test_snapshotsince", + masterQueue: run.masterQueue, + maxRunCount: 10, + }); + const attemptResult = await engine.startRunAttempt({ + runId: dequeued[0].run.id, + snapshotId: dequeued[0].snapshot.id, + }); + + // Block the run with a waitpoint (snapshot 2) + const { waitpoint } = await engine.createDateTimeWaitpoint({ + projectId: authenticatedEnvironment.project.id, + environmentId: authenticatedEnvironment.id, + completedAfter: new Date(Date.now() + 100), + }); + await engine.blockRunWithWaitpoint({ + runId: run.id, + waitpoints: [waitpoint.id], + projectId: authenticatedEnvironment.project.id, + organizationId: authenticatedEnvironment.organization.id, + releaseConcurrency: true, + }); + + // Wait for the waitpoint to complete and unblock (snapshot 3) + await setTimeout(200); + await engine.completeWaitpoint({ id: waitpoint.id }); + await setTimeout(200); + + // Get all snapshots for the run + const allSnapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId: run.id, isValid: true }, + orderBy: { createdAt: "asc" }, + }); + expect(allSnapshots.length).toBeGreaterThanOrEqual(3); + + // getSnapshotsSince with the first snapshot should return at least 2 + const sinceFirst = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: allSnapshots[0].id, + }); + assertNonNullable(sinceFirst); + expect(sinceFirst.length).toBeGreaterThanOrEqual(2); + + // Check completedWaitpoints for each returned snapshot + for (const snap of sinceFirst) { + expect(Array.isArray(snap.completedWaitpoints)).toBe(true); + } + + // At least one snapshot should have a completed waitpoint + expect(sinceFirst.some((snap) => snap.completedWaitpoints.length === 1)).toBe(true); + + // If any completedWaitpoints exist, check output is not an error + const withCompleted = sinceFirst.find((snap) => snap.completedWaitpoints.length === 1); + if (withCompleted) { + expect(withCompleted.completedWaitpoints[0].outputIsError).toBe(false); + } + + // getSnapshotsSince with the latest snapshot should return 0 + const sinceLatest = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: allSnapshots[allSnapshots.length - 1].id, + }); + assertNonNullable(sinceLatest); + expect(sinceLatest.length).toBe(0); + + // getSnapshotsSince with an invalid snapshotId should throw or return [] + let threw = false; + try { + const sinceInvalid = await engine.getSnapshotsSince({ + runId: run.id, + snapshotId: "invalid-id", + }); + expect(sinceInvalid).toBeNull(); + } catch (e) { + threw = true; + } + // should never throw + expect(threw).toBe(false); + } finally { + await engine.quit(); + } + } + ); }); diff --git a/packages/cli-v3/package.json b/packages/cli-v3/package.json index c09a1f8bc3..294d675d12 100644 --- a/packages/cli-v3/package.json +++ b/packages/cli-v3/package.json @@ -39,6 +39,9 @@ "esm" ], "project": "./tsconfig.src.json", + "exclude": [ + "**/*.test.ts" + ], "exports": { "./package.json": "./package.json", ".": "./src/index.ts" @@ -70,6 +73,7 @@ "typecheck": "tsc -p tsconfig.src.json --noEmit", "build": "tshy && pnpm run update-version", "dev": "tshy --watch", + "test": "vitest", "test:e2e": "vitest --run -c ./e2e/vitest.config.ts", "update-version": "tsx ../../scripts/updateVersion.ts" }, diff --git a/packages/cli-v3/src/commands/deploy.ts b/packages/cli-v3/src/commands/deploy.ts index 35042c6c1b..416404df3f 100644 --- a/packages/cli-v3/src/commands/deploy.ts +++ b/packages/cli-v3/src/commands/deploy.ts @@ -1,10 +1,11 @@ -import { intro, outro } from "@clack/prompts"; +import { intro, log, outro } from "@clack/prompts"; import { prepareDeploymentError } from "@trigger.dev/core/v3"; import { InitializeDeploymentResponseBody } from "@trigger.dev/core/v3/schemas"; import { Command, Option as CommandOption } from "commander"; import { resolve } from "node:path"; import { x } from "tinyexec"; import { z } from "zod"; +import { isCI } from "std-env"; import { CliApiClient } from "../apiClient.js"; import { buildWorker } from "../build/buildWorker.js"; import { resolveAlwaysExternal } from "../build/externals.js"; @@ -339,24 +340,24 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { const version = deployment.version; - const deploymentLink = cliLink( - "View deployment", - `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}` - ); + const rawDeploymentLink = `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/deployments/${deployment.shortCode}`; + const rawTestLink = `${authorization.dashboardUrl}/projects/v3/${ + resolvedConfig.project + }/test?environment=${options.env === "prod" ? "prod" : "stg"}`; - const testLink = cliLink( - "Test tasks", - `${authorization.dashboardUrl}/projects/v3/${resolvedConfig.project}/test?environment=${ - options.env === "prod" ? "prod" : "stg" - }` - ); + const deploymentLink = cliLink("View deployment", rawDeploymentLink); + const testLink = cliLink("Test tasks", rawTestLink); const $spinner = spinner(); - if (isLinksSupported) { - $spinner.start(`Building version ${version} ${deploymentLink}`); + if (isCI) { + log.step(`Building version ${version}\n`); } else { - $spinner.start(`Building version ${version}`); + if (isLinksSupported) { + $spinner.start(`Building version ${version} ${deploymentLink}`); + } else { + $spinner.start(`Building version ${version}`); + } } const selfHostedRegistryHost = deployment.registryHost ?? options.registry; @@ -386,6 +387,11 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { buildEnvVars: buildManifest.build.env, network: options.network, onLog: (logMessage) => { + if (isCI) { + console.log(logMessage); + return; + } + if (isLinksSupported) { $spinner.message(`Building version ${version} ${deploymentLink}: ${logMessage}`); } else { @@ -459,10 +465,14 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { }` : `${buildResult.image}${buildResult.digest ? `@${buildResult.digest}` : ""}`; - if (isLinksSupported) { - $spinner.message(`Deploying version ${version} ${deploymentLink}`); + if (isCI) { + log.step(`Deploying version ${version}\n`); } else { - $spinner.message(`Deploying version ${version}`); + if (isLinksSupported) { + $spinner.message(`Deploying version ${version} ${deploymentLink}`); + } else { + $spinner.message(`Deploying version ${version}`); + } } const finalizeResponse = await projectClient.client.finalizeDeployment( @@ -473,6 +483,11 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { skipPromotion: options.skipPromotion, }, (logMessage) => { + if (isCI) { + console.log(logMessage); + return; + } + if (isLinksSupported) { $spinner.message(`Deploying version ${version} ${deploymentLink}: ${logMessage}`); } else { @@ -493,7 +508,11 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { throw new SkipLoggingError("Failed to finalize deployment"); } - $spinner.stop(`Successfully deployed version ${version}`); + if (isCI) { + log.step(`Successfully deployed version ${version}`); + } else { + $spinner.stop(`Successfully deployed version ${version}`); + } const taskCount = deploymentWithWorker.worker?.tasks.length ?? 0; @@ -503,6 +522,14 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) { }` ); + if (!isLinksSupported) { + console.log("View deployment"); + console.log(rawDeploymentLink); + console.log(); // new line + console.log("Test tasks"); + console.log(rawTestLink); + } + setGithubActionsOutputAndEnvVars({ envVars: { TRIGGER_DEPLOYMENT_VERSION: version, diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 3a5ca0815c..650143118b 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -36,7 +36,7 @@ import { getEnvVar, getNumberEnvVar, logLevels, - ManagedRuntimeManager, + SharedRuntimeManager, OtelTaskLogger, populateEnv, StandardLifecycleHooksManager, @@ -455,12 +455,6 @@ const zodIpc = new ZodIpcConnection({ }); } }, - TASK_RUN_COMPLETED_NOTIFICATION: async () => { - await managedWorkerRuntime.completeWaitpoints([]); - }, - WAIT_COMPLETED_NOTIFICATION: async () => { - await managedWorkerRuntime.completeWaitpoints([]); - }, CANCEL: async ({ timeoutInMs }) => { _isCancelled = true; cancelController.abort("run cancelled"); @@ -473,11 +467,8 @@ const zodIpc = new ZodIpcConnection({ FLUSH: async ({ timeoutInMs }) => { await flushAll(timeoutInMs); }, - WAITPOINT_CREATED: async ({ wait, waitpoint }) => { - managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id); - }, - WAITPOINT_COMPLETED: async ({ waitpoint }) => { - managedWorkerRuntime.completeWaitpoints([waitpoint]); + RESOLVE_WAITPOINT: async ({ waitpoint }) => { + sharedWorkerRuntime.resolveWaitpoints([waitpoint]); }, }, }); @@ -561,8 +552,8 @@ async function flushMetadata(timeoutInMs: number = 10_000) { }; } -const managedWorkerRuntime = new ManagedRuntimeManager(zodIpc, showInternalLogs); -runtime.setGlobalRuntimeManager(managedWorkerRuntime); +const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs); +runtime.setGlobalRuntimeManager(sharedWorkerRuntime); process.title = "trigger-managed-worker"; diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 894ed70aa1..9e84130124 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -35,7 +35,7 @@ import { getEnvVar, getNumberEnvVar, logLevels, - ManagedRuntimeManager, + SharedRuntimeManager, OtelTaskLogger, populateEnv, ProdUsageManager, @@ -448,12 +448,6 @@ const zodIpc = new ZodIpcConnection({ }); } }, - TASK_RUN_COMPLETED_NOTIFICATION: async () => { - await managedWorkerRuntime.completeWaitpoints([]); - }, - WAIT_COMPLETED_NOTIFICATION: async () => { - await managedWorkerRuntime.completeWaitpoints([]); - }, FLUSH: async ({ timeoutInMs }, sender) => { await flushAll(timeoutInMs); }, @@ -466,11 +460,8 @@ const zodIpc = new ZodIpcConnection({ } await flushAll(timeoutInMs); }, - WAITPOINT_CREATED: async ({ wait, waitpoint }) => { - managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id); - }, - WAITPOINT_COMPLETED: async ({ waitpoint }) => { - managedWorkerRuntime.completeWaitpoints([waitpoint]); + RESOLVE_WAITPOINT: async ({ waitpoint }) => { + sharedWorkerRuntime.resolveWaitpoints([waitpoint]); }, }, }); @@ -589,9 +580,9 @@ function initializeUsageManager({ timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); } -const managedWorkerRuntime = new ManagedRuntimeManager(zodIpc, true); +const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, true); -runtime.setGlobalRuntimeManager(managedWorkerRuntime); +runtime.setGlobalRuntimeManager(sharedWorkerRuntime); process.title = "trigger-managed-worker"; diff --git a/packages/cli-v3/src/entryPoints/managed/controller.ts b/packages/cli-v3/src/entryPoints/managed/controller.ts index f73c313d72..a79b670354 100644 --- a/packages/cli-v3/src/entryPoints/managed/controller.ts +++ b/packages/cli-v3/src/entryPoints/managed/controller.ts @@ -8,7 +8,7 @@ import { } from "@trigger.dev/core/v3/workers"; import { io, type Socket } from "socket.io-client"; import { RunnerEnv } from "./env.js"; -import { RunLogger, SendDebugLogOptions } from "./logger.js"; +import { ManagedRunLogger, RunLogger, SendDebugLogOptions } from "./logger.js"; import { EnvObject } from "std-env"; import { RunExecution } from "./execution.js"; import { tryCatch } from "@trigger.dev/core/utils"; @@ -18,7 +18,7 @@ type ManagedRunControllerOptions = { env: EnvObject; }; -type SupervisorSocket = Socket; +export type SupervisorSocket = Socket; export class ManagedRunController { private readonly env: RunnerEnv; @@ -31,6 +31,9 @@ export class ManagedRunController { private warmStartCount = 0; private restoreCount = 0; + private notificationCount = 0; + private lastNotificationAt: Date | null = null; + private currentExecution: RunExecution | null = null; constructor(opts: ManagedRunControllerOptions) { @@ -47,7 +50,7 @@ export class ManagedRunController { projectRef: env.TRIGGER_PROJECT_REF, }); - this.logger = new RunLogger({ + this.logger = new ManagedRunLogger({ httpClient: this.httpClient, env, }); @@ -91,6 +94,8 @@ export class ManagedRunController { return { warmStartCount: this.warmStartCount, restoreCount: this.restoreCount, + notificationCount: this.notificationCount, + lastNotificationAt: this.lastNotificationAt, }; } @@ -188,12 +193,16 @@ export class ManagedRunController { this.currentExecution = null; } + // Remove all run notification listeners just to be safe + this.socket.removeAllListeners("run:notify"); + if (!this.currentExecution || !this.currentExecution.canExecute) { this.currentExecution = new RunExecution({ workerManifest: this.workerManifest, env: this.env, httpClient: this.httpClient, logger: this.logger, + supervisorSocket: this.socket, }); } @@ -224,8 +233,8 @@ export class ManagedRunController { const metrics = this.currentExecution?.metrics; - if (metrics?.restoreCount) { - this.restoreCount += metrics.restoreCount; + if (metrics?.execution?.restoreCount) { + this.restoreCount += metrics.execution.restoreCount; } this.lockedRunExecution = null; @@ -288,6 +297,7 @@ export class ManagedRunController { env: this.env, httpClient: this.httpClient, logger: this.logger, + supervisorSocket: this.socket, }).prepareForExecution({ taskRunEnv: previousTaskRunEnv, }); @@ -384,7 +394,7 @@ export class ManagedRunController { properties: { code }, }); - this.currentExecution?.exit(); + this.currentExecution?.kill().catch(() => {}); process.exit(code); } @@ -392,80 +402,12 @@ export class ManagedRunController { createSupervisorSocket(): SupervisorSocket { const wsUrl = new URL("/workload", this.workerApiUrl); - const socket = io(wsUrl.href, { + const socket: SupervisorSocket = io(wsUrl.href, { transports: ["websocket"], extraHeaders: { [WORKLOAD_HEADERS.DEPLOYMENT_ID]: this.env.TRIGGER_DEPLOYMENT_ID, [WORKLOAD_HEADERS.RUNNER_ID]: this.env.TRIGGER_RUNNER_ID, }, - }) satisfies SupervisorSocket; - - socket.on("run:notify", async ({ version, run }) => { - this.sendDebugLog({ - runId: run.friendlyId, - message: "run:notify received by runner", - properties: { version, runId: run.friendlyId }, - }); - - if (!this.runFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "run:notify: ignoring notification, no local run ID", - properties: { - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, - }, - }); - return; - } - - if (run.friendlyId !== this.runFriendlyId) { - this.sendDebugLog({ - runId: run.friendlyId, - message: "run:notify: ignoring notification for different run", - properties: { - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, - notificationRunId: run.friendlyId, - }, - }); - return; - } - - const latestSnapshot = await this.httpClient.getRunExecutionData(this.runFriendlyId); - - if (!latestSnapshot.success) { - this.sendDebugLog({ - runId: this.runFriendlyId, - message: "run:notify: failed to get latest snapshot data", - properties: { - currentRunId: this.runFriendlyId, - currentSnapshotId: this.snapshotFriendlyId, - error: latestSnapshot.error, - }, - }); - return; - } - - const runExecutionData = latestSnapshot.data.execution; - - if (!this.currentExecution) { - this.sendDebugLog({ - runId: runExecutionData.run.friendlyId, - message: "handleSnapshotChange: no current execution", - }); - return; - } - - const [error] = await tryCatch(this.currentExecution.handleSnapshotChange(runExecutionData)); - - if (error) { - this.sendDebugLog({ - runId: runExecutionData.run.friendlyId, - message: "handleSnapshotChange: unexpected error", - properties: { error: error.message }, - }); - } }); socket.on("connect", () => { @@ -496,7 +438,7 @@ export class ManagedRunController { }); }); - socket.on("disconnect", (reason, description) => { + socket.on("disconnect", async (reason, description) => { const parseDescription = (): | { description: string; @@ -519,6 +461,30 @@ export class ManagedRunController { }; }; + if (this.currentExecution) { + const currentEnv = { + workerInstanceName: this.env.TRIGGER_WORKER_INSTANCE_NAME, + runnerId: this.env.TRIGGER_RUNNER_ID, + supervisorApiUrl: this.env.TRIGGER_SUPERVISOR_API_URL, + }; + + await this.currentExecution.processEnvOverrides("socket disconnected"); + + const newEnv = { + workerInstanceName: this.env.TRIGGER_WORKER_INSTANCE_NAME, + runnerId: this.env.TRIGGER_RUNNER_ID, + supervisorApiUrl: this.env.TRIGGER_SUPERVISOR_API_URL, + }; + + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Socket disconnected from supervisor - processed env overrides", + properties: { reason, ...parseDescription(), currentEnv, newEnv }, + }); + + return; + } + this.sendDebugLog({ runId: this.runFriendlyId, message: "Socket disconnected from supervisor", @@ -529,16 +495,6 @@ export class ManagedRunController { return socket; } - async cancelAttempt(runId: string) { - this.sendDebugLog({ - runId, - message: "cancelling attempt", - properties: { runId }, - }); - - await this.currentExecution?.cancel(); - } - start() { this.sendDebugLog({ runId: this.runFriendlyId, @@ -567,7 +523,18 @@ export class ManagedRunController { message: "Shutting down", }); - await this.currentExecution?.cancel(); + // Cancel the current execution + const [error] = await tryCatch(this.currentExecution?.cancel()); + + if (error) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Error during shutdown", + properties: { error: String(error) }, + }); + } + + // Close the socket this.socket.close(); } diff --git a/packages/cli-v3/src/entryPoints/managed/env.ts b/packages/cli-v3/src/entryPoints/managed/env.ts index 8f03968084..87b5483354 100644 --- a/packages/cli-v3/src/entryPoints/managed/env.ts +++ b/packages/cli-v3/src/entryPoints/managed/env.ts @@ -22,8 +22,6 @@ const Env = z.object({ // Set at runtime TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`), TRIGGER_ENV_ID: z.string(), - TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts - TRIGGER_SNAPSHOT_ID: z.string().optional(), // This is only useful for cold starts OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), TRIGGER_WARM_START_URL: z.string().optional(), TRIGGER_WARM_START_CONNECTION_TIMEOUT_MS: z.coerce.number().default(30_000), @@ -32,13 +30,14 @@ const Env = z.object({ TRIGGER_MACHINE_MEMORY: z.string().default("0"), TRIGGER_RUNNER_ID: z.string(), TRIGGER_METADATA_URL: z.string().optional(), - TRIGGER_PRE_SUSPEND_WAIT_MS: z.coerce.number().default(200), // Timeline metrics TRIGGER_POD_SCHEDULED_AT_MS: DateEnv, TRIGGER_DEQUEUED_AT_MS: DateEnv, // May be overridden + TRIGGER_RUN_ID: z.string().optional(), // This is set for cold starts and restores + TRIGGER_SNAPSHOT_ID: z.string().optional(), // This is set for cold starts and restores TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]), TRIGGER_SUPERVISOR_API_DOMAIN: z.string(), TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(), @@ -95,12 +94,6 @@ export class RunnerEnv { get TRIGGER_ENV_ID() { return this.env.TRIGGER_ENV_ID; } - get TRIGGER_RUN_ID() { - return this.env.TRIGGER_RUN_ID; - } - get TRIGGER_SNAPSHOT_ID() { - return this.env.TRIGGER_SNAPSHOT_ID; - } get TRIGGER_WARM_START_URL() { return this.env.TRIGGER_WARM_START_URL; } @@ -119,9 +112,6 @@ export class RunnerEnv { get TRIGGER_METADATA_URL() { return this.env.TRIGGER_METADATA_URL; } - get TRIGGER_PRE_SUSPEND_WAIT_MS() { - return this.env.TRIGGER_PRE_SUSPEND_WAIT_MS; - } get TRIGGER_POD_SCHEDULED_AT_MS() { return this.env.TRIGGER_POD_SCHEDULED_AT_MS; } @@ -130,6 +120,12 @@ export class RunnerEnv { } // Overridable values + get TRIGGER_RUN_ID() { + return this.env.TRIGGER_RUN_ID; + } + get TRIGGER_SNAPSHOT_ID() { + return this.env.TRIGGER_SNAPSHOT_ID; + } get TRIGGER_SUCCESS_EXIT_CODE() { return this.env.TRIGGER_SUCCESS_EXIT_CODE; } @@ -167,6 +163,14 @@ export class RunnerEnv { /** Overrides existing env vars with new values */ override(overrides: Metadata) { + if (overrides.TRIGGER_RUN_ID) { + this.env.TRIGGER_RUN_ID = overrides.TRIGGER_RUN_ID; + } + + if (overrides.TRIGGER_SNAPSHOT_ID) { + this.env.TRIGGER_SNAPSHOT_ID = overrides.TRIGGER_SNAPSHOT_ID; + } + if (overrides.TRIGGER_SUCCESS_EXIT_CODE) { this.env.TRIGGER_SUCCESS_EXIT_CODE = overrides.TRIGGER_SUCCESS_EXIT_CODE; } diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index 927ed409fe..41446cfa35 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -5,6 +5,7 @@ import { type TaskRunExecutionMetrics, type TaskRunExecutionResult, TaskRunExecutionRetry, + TaskRunExecutionStatus, type TaskRunFailedExecutionResult, WorkerManifest, } from "@trigger.dev/core/v3"; @@ -16,8 +17,11 @@ import { WorkloadHttpClient } from "@trigger.dev/core/v3/workers"; import { setTimeout as sleep } from "timers/promises"; import { RunExecutionSnapshotPoller } from "./poller.js"; import { assertExhaustive, tryCatch } from "@trigger.dev/core/utils"; -import { MetadataClient } from "./overrides.js"; +import { Metadata, MetadataClient } from "./overrides.js"; import { randomBytes } from "node:crypto"; +import { SnapshotManager, SnapshotState } from "./snapshot.js"; +import type { SupervisorSocket } from "./controller.js"; +import { RunNotifier } from "./notifier.js"; class ExecutionAbortError extends Error { constructor(message: string) { @@ -31,6 +35,7 @@ type RunExecutionOptions = { env: RunnerEnv; httpClient: WorkloadHttpClient; logger: RunLogger; + supervisorSocket: SupervisorSocket; }; type RunExecutionPrepareOptions = { @@ -50,9 +55,9 @@ export class RunExecution { private executionAbortController: AbortController; private _runFriendlyId?: string; - private currentSnapshotId?: string; private currentAttemptNumber?: number; private currentTaskRunEnv?: Record; + private snapshotManager?: SnapshotManager; private dequeuedAt?: Date; private podScheduledAt?: Date; @@ -67,6 +72,11 @@ export class RunExecution { private lastHeartbeat?: Date; private isShuttingDown = false; + private shutdownReason?: string; + + private supervisorSocket: SupervisorSocket; + private notifier?: RunNotifier; + private metadataClient?: MetadataClient; constructor(opts: RunExecutionOptions) { this.id = randomBytes(4).toString("hex"); @@ -74,9 +84,38 @@ export class RunExecution { this.env = opts.env; this.httpClient = opts.httpClient; this.logger = opts.logger; + this.supervisorSocket = opts.supervisorSocket; this.restoreCount = 0; this.executionAbortController = new AbortController(); + + if (this.env.TRIGGER_METADATA_URL) { + this.metadataClient = new MetadataClient(this.env.TRIGGER_METADATA_URL); + } + } + + /** + * Cancels the current execution. + */ + public async cancel(): Promise { + if (this.isShuttingDown) { + throw new Error("cancel called after execution shut down"); + } + + this.sendDebugLog("cancelling attempt", { runId: this.runFriendlyId }); + + await this.taskRunProcess?.cancel(); + } + + /** + * Kills the current execution. + */ + public async kill({ exitExecution = true }: { exitExecution?: boolean } = {}) { + await this.taskRunProcess?.kill("SIGKILL"); + + if (exitExecution) { + this.shutdown("kill"); + } } /** @@ -84,6 +123,10 @@ export class RunExecution { * This should be called before executing, typically after a successful run to prepare for the next one. */ public prepareForExecution(opts: RunExecutionPrepareOptions): this { + if (this.isShuttingDown) { + throw new Error("prepareForExecution called after execution shut down"); + } + if (this.taskRunProcess) { throw new Error("prepareForExecution called after process was already created"); } @@ -144,6 +187,14 @@ export class RunExecution { } }); + taskRunProcess.onSendDebugLog.attach(async (debugLog) => { + this.sendRuntimeDebugLog(debugLog.message, debugLog.properties); + }); + + taskRunProcess.onSetSuspendable.attach(async ({ suspendable }) => { + this.suspendable = suspendable; + }); + return taskRunProcess; } @@ -161,74 +212,28 @@ export class RunExecution { /** * Called by the RunController when it receives a websocket notification - * or when the snapshot poller detects a change + * or when the snapshot poller detects a change. + * + * This is the main entry point for snapshot changes, but processing is deferred to the snapshot manager. */ - public async handleSnapshotChange(runData: RunExecutionData): Promise { + private async enqueueSnapshotChangesAndWait(snapshots: RunExecutionData[]): Promise { if (this.isShuttingDown) { - this.sendDebugLog("handleSnapshotChange: shutting down, skipping"); - return; - } - - const { run, snapshot, completedWaitpoints } = runData; - - const snapshotMetadata = { - incomingRunId: run.friendlyId, - incomingSnapshotId: snapshot.friendlyId, - completedWaitpoints: completedWaitpoints.length, - }; - - // Ensure we have run details - if (!this.runFriendlyId || !this.currentSnapshotId) { - this.sendDebugLog( - "handleSnapshotChange: missing run or snapshot ID", - snapshotMetadata, - run.friendlyId - ); + this.sendDebugLog("enqueueSnapshotChangeAndWait: shutting down, skipping"); return; } - // Ensure the run ID matches - if (run.friendlyId !== this.runFriendlyId) { - // Send debug log to both runs - this.sendDebugLog("handleSnapshotChange: mismatched run IDs", snapshotMetadata); - this.sendDebugLog( - "handleSnapshotChange: mismatched run IDs", - snapshotMetadata, - run.friendlyId - ); + if (!this.snapshotManager) { + this.sendDebugLog("enqueueSnapshotChangeAndWait: missing snapshot manager"); return; } - this.snapshotChangeQueue.push(runData); - await this.processSnapshotChangeQueue(); - } - - private snapshotChangeQueue: RunExecutionData[] = []; - private snapshotChangeQueueLock = false; - - private async processSnapshotChangeQueue() { - if (this.snapshotChangeQueueLock) { - return; - } - - this.snapshotChangeQueueLock = true; - while (this.snapshotChangeQueue.length > 0) { - const runData = this.snapshotChangeQueue.shift(); - - if (!runData) { - continue; - } - - const [error] = await tryCatch(this.processSnapshotChange(runData)); - - if (error) { - this.sendDebugLog("Failed to process snapshot change", { error: error.message }); - } - } - this.snapshotChangeQueueLock = false; + await this.snapshotManager.handleSnapshotChanges(snapshots); } - private async processSnapshotChange(runData: RunExecutionData): Promise { + private async processSnapshotChange( + runData: RunExecutionData, + deprecated: boolean + ): Promise { const { run, snapshot, completedWaitpoints } = runData; const snapshotMetadata = { @@ -236,38 +241,36 @@ export class RunExecution { completedWaitpoints: completedWaitpoints.length, }; - // Check if the incoming snapshot is newer than the current one - if (!this.currentSnapshotId || snapshot.friendlyId < this.currentSnapshotId) { - this.sendDebugLog( - "handleSnapshotChange: received older snapshot, skipping", - snapshotMetadata - ); - return; - } - - if (snapshot.friendlyId === this.currentSnapshotId) { + if (!this.snapshotManager) { + this.sendDebugLog("handleSnapshotChange: missing snapshot manager", snapshotMetadata); return; } if (this.currentAttemptNumber && this.currentAttemptNumber !== run.attemptNumber) { - this.sendDebugLog("ERROR: attempt number mismatch", snapshotMetadata); - await this.taskRunProcess?.suspend(); + this.sendDebugLog("error: attempt number mismatch", snapshotMetadata); + // This is a rogue execution, a new one will already have been created elsewhere + await this.exitTaskRunProcessWithoutFailingRun({ flush: false }); return; } - this.sendDebugLog(`snapshot has changed to: ${snapshot.executionStatus}`, snapshotMetadata); + // DO NOT REMOVE (very noisy, but helpful for debugging) + // this.sendDebugLog(`processing snapshot change: ${snapshot.executionStatus}`, snapshotMetadata); // Reset the snapshot poll interval so we don't do unnecessary work + this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId); this.snapshotPoller?.resetCurrentInterval(); - // Update internal state - this.currentSnapshotId = snapshot.friendlyId; + if (deprecated) { + this.sendDebugLog("run execution is deprecated", { incomingSnapshot: snapshot }); - // Update services - this.snapshotPoller?.updateSnapshotId(snapshot.friendlyId); + await this.exitTaskRunProcessWithoutFailingRun({ flush: false }); + return; + } switch (snapshot.executionStatus) { case "PENDING_CANCEL": { + this.sendDebugLog("run was cancelled", snapshotMetadata); + const [error] = await tryCatch(this.cancel()); if (error) { @@ -281,113 +284,44 @@ export class RunExecution { return; } case "QUEUED": { - this.sendDebugLog("Run was re-queued", snapshotMetadata); + this.sendDebugLog("run was re-queued", snapshotMetadata); - // Pretend we've just suspended the run. This will kill the process without failing the run. - await this.taskRunProcess?.suspend(); + await this.exitTaskRunProcessWithoutFailingRun({ flush: true }); return; } case "FINISHED": { - this.sendDebugLog("Run is finished", snapshotMetadata); + this.sendDebugLog("run is finished", snapshotMetadata); - // Pretend we've just suspended the run. This will kill the process without failing the run. - await this.taskRunProcess?.suspend(); + await this.exitTaskRunProcessWithoutFailingRun({ flush: true }); return; } case "QUEUED_EXECUTING": case "EXECUTING_WITH_WAITPOINTS": { - this.sendDebugLog("Run is executing with waitpoints", snapshotMetadata); + this.sendDebugLog("run is executing with waitpoints", snapshotMetadata); - const [error] = await tryCatch(this.taskRunProcess?.cleanup(false)); - - if (error) { - this.sendDebugLog("Failed to cleanup task run process, carrying on", { - ...snapshotMetadata, - error: error.message, - }); - } - - if (snapshot.friendlyId !== this.currentSnapshotId) { - this.sendDebugLog("Snapshot changed after cleanup, abort", snapshotMetadata); - - this.abortExecution(); - return; - } - - await sleep(this.env.TRIGGER_PRE_SUSPEND_WAIT_MS); - - if (snapshot.friendlyId !== this.currentSnapshotId) { - this.sendDebugLog("Snapshot changed after suspend threshold, abort", snapshotMetadata); - - this.abortExecution(); - return; - } - - if (!this.runFriendlyId || !this.currentSnapshotId) { - this.sendDebugLog( - "handleSnapshotChange: Missing run ID or snapshot ID after suspension, abort", - snapshotMetadata - ); - - this.abortExecution(); - return; - } - - const suspendResult = await this.httpClient.suspendRun( - this.runFriendlyId, - this.currentSnapshotId - ); - - if (!suspendResult.success) { - this.sendDebugLog("Failed to suspend run, staying alive 🎶", { - ...snapshotMetadata, - error: suspendResult.error, - }); - - this.sendDebugLog("checkpoint: suspend request failed", { - ...snapshotMetadata, - error: suspendResult.error, - }); - - // This is fine, we'll wait for the next status change - return; - } - - if (!suspendResult.data.ok) { - this.sendDebugLog("checkpoint: failed to suspend run", { - snapshotId: this.currentSnapshotId, - error: suspendResult.data.error, - }); - - // This is fine, we'll wait for the next status change - return; - } - - this.sendDebugLog("Suspending, any day now 🚬", snapshotMetadata); - - // Wait for next status change + // Wait for next status change - suspension is handled by the snapshot manager return; } case "SUSPENDED": { - this.sendDebugLog("Run was suspended, kill the process", snapshotMetadata); + this.sendDebugLog("run was suspended", snapshotMetadata); // This will kill the process and fail the execution with a SuspendedProcessError - await this.taskRunProcess?.suspend(); - + // We don't flush because we already did before suspending + await this.exitTaskRunProcessWithoutFailingRun({ flush: false }); return; } case "PENDING_EXECUTING": { - this.sendDebugLog("Run is pending execution", snapshotMetadata); + this.sendDebugLog("run is pending execution", snapshotMetadata); if (completedWaitpoints.length === 0) { - this.sendDebugLog("No waitpoints to complete, nothing to do", snapshotMetadata); + this.sendDebugLog("no waitpoints to complete, nothing to do", snapshotMetadata); return; } const [error] = await tryCatch(this.restore()); if (error) { - this.sendDebugLog("Failed to restore execution", { + this.sendDebugLog("failed to restore execution", { ...snapshotMetadata, error: error.message, }); @@ -399,16 +333,15 @@ export class RunExecution { return; } case "EXECUTING": { - this.sendDebugLog("Run is now executing", snapshotMetadata); - if (completedWaitpoints.length === 0) { + this.sendDebugLog("run is executing without completed waitpoints", snapshotMetadata); return; } - this.sendDebugLog("Processing completed waitpoints", snapshotMetadata); + this.sendDebugLog("run is executing with completed waitpoints", snapshotMetadata); if (!this.taskRunProcess) { - this.sendDebugLog("No task run process, ignoring completed waitpoints", snapshotMetadata); + this.sendDebugLog("no task run process, ignoring completed waitpoints", snapshotMetadata); this.abortExecution(); return; @@ -421,7 +354,10 @@ export class RunExecution { return; } case "RUN_CREATED": { - this.sendDebugLog("Invalid status change", snapshotMetadata); + this.sendDebugLog( + "aborting execution: invalid status change: RUN_CREATED", + snapshotMetadata + ); this.abortExecution(); return; @@ -437,11 +373,11 @@ export class RunExecution { }: { isWarmStart?: boolean; }): Promise { - if (!this.runFriendlyId || !this.currentSnapshotId) { - throw new Error("Cannot start attempt: missing run or snapshot ID"); + if (!this.runFriendlyId || !this.snapshotManager) { + throw new Error("Cannot start attempt: missing run or snapshot manager"); } - this.sendDebugLog("Starting attempt"); + this.sendDebugLog("starting attempt"); const attemptStartedAt = Date.now(); @@ -452,7 +388,7 @@ export class RunExecution { const start = await this.httpClient.startRunAttempt( this.runFriendlyId, - this.currentSnapshotId, + this.snapshotManager.snapshotId, { isWarmStart } ); @@ -465,14 +401,17 @@ export class RunExecution { } // A snapshot was just created, so update the snapshot ID - this.currentSnapshotId = start.data.snapshot.friendlyId; + this.snapshotManager.updateSnapshot( + start.data.snapshot.friendlyId, + start.data.snapshot.executionStatus + ); // Also set or update the attempt number - we do this to detect illegal attempt number changes, e.g. from stalled runners coming back online const attemptNumber = start.data.run.attemptNumber; if (attemptNumber && attemptNumber > 0) { this.currentAttemptNumber = attemptNumber; } else { - this.sendDebugLog("ERROR: invalid attempt number returned from start attempt", { + this.sendDebugLog("error: invalid attempt number returned from start attempt", { attemptNumber: String(attemptNumber), }); } @@ -483,7 +422,7 @@ export class RunExecution { podScheduledAt: this.podScheduledAt?.getTime(), }); - this.sendDebugLog("Started attempt"); + this.sendDebugLog("started attempt"); return { ...start.data, metrics }; } @@ -493,45 +432,67 @@ export class RunExecution { * When this returns, the child process will have been cleaned up. */ public async execute(runOpts: RunExecutionRunOptions): Promise { + if (this.isShuttingDown) { + throw new Error("execute called after execution shut down"); + } + // Setup initial state this.runFriendlyId = runOpts.runFriendlyId; - this.currentSnapshotId = runOpts.snapshotFriendlyId; + + // Create snapshot manager + this.snapshotManager = new SnapshotManager({ + runFriendlyId: runOpts.runFriendlyId, + runnerId: this.env.TRIGGER_RUNNER_ID, + initialSnapshotId: runOpts.snapshotFriendlyId, + // We're just guessing here, but "PENDING_EXECUTING" is probably fine + initialStatus: "PENDING_EXECUTING", + logger: this.logger, + metadataClient: this.metadataClient, + onSnapshotChange: this.processSnapshotChange.bind(this), + onSuspendable: this.handleSuspendable.bind(this), + }); + this.dequeuedAt = runOpts.dequeuedAt; this.podScheduledAt = runOpts.podScheduledAt; // Create and start services this.snapshotPoller = new RunExecutionSnapshotPoller({ runFriendlyId: this.runFriendlyId, - snapshotFriendlyId: this.currentSnapshotId, - httpClient: this.httpClient, + snapshotFriendlyId: this.snapshotManager.snapshotId, logger: this.logger, snapshotPollIntervalSeconds: this.env.TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS, - handleSnapshotChange: this.handleSnapshotChange.bind(this), - }); + onPoll: this.fetchAndProcessSnapshotChanges.bind(this), + }).start(); - this.snapshotPoller.start(); + this.notifier = new RunNotifier({ + runFriendlyId: this.runFriendlyId, + supervisorSocket: this.supervisorSocket, + onNotify: this.fetchAndProcessSnapshotChanges.bind(this), + logger: this.logger, + }).start(); const [startError, start] = await tryCatch( this.startAttempt({ isWarmStart: runOpts.isWarmStart }) ); if (startError) { - this.sendDebugLog("Failed to start attempt", { error: startError.message }); + this.sendDebugLog("failed to start attempt", { error: startError.message }); - this.stopServices(); + this.shutdown("failed to start attempt"); return; } const [executeError] = await tryCatch(this.executeRunWrapper(start)); if (executeError) { - this.sendDebugLog("Failed to execute run", { error: executeError.message }); + this.sendDebugLog("failed to execute run", { error: executeError.message }); - this.stopServices(); + this.shutdown("failed to execute run"); return; } - this.stopServices(); + // This is here for safety, but it + this.shutdown("execute call finished"); } private async executeRunWrapper({ @@ -558,15 +519,12 @@ export class RunExecution { }) ); - this.sendDebugLog("Run execution completed", { error: executeError?.message }); - if (!executeError) { - this.stopServices(); return; } if (executeError instanceof SuspendedProcessError) { - this.sendDebugLog("Run was suspended", { + this.sendDebugLog("execution was suspended", { run: run.friendlyId, snapshot: snapshot.friendlyId, error: executeError.message, @@ -576,7 +534,7 @@ export class RunExecution { } if (executeError instanceof ExecutionAbortError) { - this.sendDebugLog("Run was interrupted", { + this.sendDebugLog("execution was aborted", { run: run.friendlyId, snapshot: snapshot.friendlyId, error: executeError.message, @@ -585,7 +543,7 @@ export class RunExecution { return; } - this.sendDebugLog("Error while executing attempt", { + this.sendDebugLog("error while executing attempt", { error: executeError.message, runId: run.friendlyId, snapshotId: snapshot.friendlyId, @@ -601,10 +559,8 @@ export class RunExecution { const [completeError] = await tryCatch(this.complete({ completion })); if (completeError) { - this.sendDebugLog("Failed to complete run", { error: completeError.message }); + this.sendDebugLog("failed to complete run", { error: completeError.message }); } - - this.stopServices(); } private async executeRun({ @@ -625,7 +581,7 @@ export class RunExecution { !this.taskRunProcess.isPreparedForNextAttempt ) { this.sendDebugLog("killing existing task run process before executing next attempt"); - await this.kill().catch(() => {}); + await this.kill({ exitExecution: false }).catch(() => {}); } // To skip this step and eagerly create the task run process, run prepareForExecution first @@ -637,7 +593,7 @@ export class RunExecution { // Set up an abort handler that will cleanup the task run process this.executionAbortController.signal.addEventListener("abort", async () => { - this.sendDebugLog("Execution aborted during task run, cleaning up process", { + this.sendDebugLog("execution aborted during task run, cleaning up process", { runId: execution.run.id, }); @@ -658,13 +614,13 @@ export class RunExecution { ); // If we get here, the task completed normally - this.sendDebugLog("Completed run attempt", { attemptSuccess: completion.ok }); + this.sendDebugLog("completed run attempt", { attemptSuccess: completion.ok }); // The execution has finished, so we can cleanup the task run process. Killing it should be safe. const [error] = await tryCatch(this.taskRunProcess.cleanup(true)); if (error) { - this.sendDebugLog("Failed to cleanup task run process, submitting completion anyway", { + this.sendDebugLog("failed to cleanup task run process, submitting completion anyway", { error: error.message, }); } @@ -672,37 +628,18 @@ export class RunExecution { const [completionError] = await tryCatch(this.complete({ completion })); if (completionError) { - this.sendDebugLog("Failed to complete run", { error: completionError.message }); - } - } - - /** - * Cancels the current execution. - */ - public async cancel(): Promise { - this.sendDebugLog("cancelling attempt", { runId: this.runFriendlyId }); - - await this.taskRunProcess?.cancel(); - } - - public exit() { - if (this.taskRunProcess?.isPreparedForNextRun) { - this.taskRunProcess?.forceExit(); + this.sendDebugLog("failed to complete run", { error: completionError.message }); } } - public async kill() { - await this.taskRunProcess?.kill("SIGKILL"); - } - private async complete({ completion }: { completion: TaskRunExecutionResult }): Promise { - if (!this.runFriendlyId || !this.currentSnapshotId) { - throw new Error("Cannot complete run: missing run or snapshot ID"); + if (!this.runFriendlyId || !this.snapshotManager) { + throw new Error("cannot complete run: missing run or snapshot manager"); } const completionResult = await this.httpClient.completeRunAttempt( this.runFriendlyId, - this.currentSnapshotId, + this.snapshotManager.snapshotId, { completion } ); @@ -723,50 +660,68 @@ export class RunExecution { completion: TaskRunExecutionResult; result: CompleteRunAttemptResult; }) { - this.sendDebugLog("Handling completion result", { + this.sendDebugLog(`completion result: ${result.attemptStatus}`, { attemptSuccess: completion.ok, attemptStatus: result.attemptStatus, snapshotId: result.snapshot.friendlyId, runId: result.run.friendlyId, }); - // Update our snapshot ID to match the completion result - // This ensures any subsequent API calls use the correct snapshot - this.currentSnapshotId = result.snapshot.friendlyId; + const snapshotStatus = this.convertAttemptStatusToSnapshotStatus(result.attemptStatus); - const { attemptStatus } = result; + // Update our snapshot ID to match the completion result to ensure any subsequent API calls use the correct snapshot + this.updateSnapshotAfterCompletion(result.snapshot.friendlyId, snapshotStatus); - if (attemptStatus === "RUN_FINISHED") { - this.sendDebugLog("Run finished"); - - return; - } + const { attemptStatus } = result; - if (attemptStatus === "RUN_PENDING_CANCEL") { - this.sendDebugLog("Run pending cancel"); - return; - } + switch (attemptStatus) { + case "RUN_FINISHED": + case "RUN_PENDING_CANCEL": + case "RETRY_QUEUED": { + return; + } + case "RETRY_IMMEDIATELY": { + if (attemptStatus !== "RETRY_IMMEDIATELY") { + return; + } - if (attemptStatus === "RETRY_QUEUED") { - this.sendDebugLog("Retry queued"); + if (completion.ok) { + throw new Error("Should retry but completion OK."); + } - return; - } + if (!completion.retry) { + throw new Error("Should retry but missing retry params."); + } - if (attemptStatus === "RETRY_IMMEDIATELY") { - if (completion.ok) { - throw new Error("Should retry but completion OK."); + await this.retryImmediately({ retryOpts: completion.retry }); + return; } - - if (!completion.retry) { - throw new Error("Should retry but missing retry params."); + default: { + assertExhaustive(attemptStatus); } - - await this.retryImmediately({ retryOpts: completion.retry }); - return; } + } + + private updateSnapshotAfterCompletion(snapshotId: string, status: TaskRunExecutionStatus) { + this.snapshotManager?.updateSnapshot(snapshotId, status); + this.snapshotPoller?.updateSnapshotId(snapshotId); + } - assertExhaustive(attemptStatus); + private convertAttemptStatusToSnapshotStatus( + attemptStatus: CompleteRunAttemptResult["attemptStatus"] + ): TaskRunExecutionStatus { + switch (attemptStatus) { + case "RUN_FINISHED": + return "FINISHED"; + case "RUN_PENDING_CANCEL": + return "PENDING_CANCEL"; + case "RETRY_QUEUED": + return "QUEUED"; + case "RETRY_IMMEDIATELY": + return "EXECUTING"; + default: + assertExhaustive(attemptStatus); + } } private measureExecutionMetrics({ @@ -809,7 +764,7 @@ export class RunExecution { } private async retryImmediately({ retryOpts }: { retryOpts: TaskRunExecutionRetry }) { - this.sendDebugLog("Retrying run immediately", { + this.sendDebugLog("retrying run immediately", { timestamp: retryOpts.timestamp, delay: retryOpts.delay, }); @@ -825,43 +780,41 @@ export class RunExecution { const [startError, start] = await tryCatch(this.startAttempt({ isWarmStart: true })); if (startError) { - this.sendDebugLog("Failed to start attempt for retry", { error: startError.message }); + this.sendDebugLog("failed to start attempt for retry", { error: startError.message }); - this.stopServices(); + this.shutdown("retryImmediately: failed to start attempt"); return; } const [executeError] = await tryCatch(this.executeRunWrapper({ ...start, isWarmStart: true })); if (executeError) { - this.sendDebugLog("Failed to execute run for retry", { error: executeError.message }); + this.sendDebugLog("failed to execute run for retry", { error: executeError.message }); - this.stopServices(); + this.shutdown("retryImmediately: failed to execute run"); return; } - - this.stopServices(); } /** * Restores a suspended execution from PENDING_EXECUTING */ private async restore(): Promise { - this.sendDebugLog("Restoring execution"); + this.sendDebugLog("restoring execution"); - if (!this.runFriendlyId || !this.currentSnapshotId) { - throw new Error("Cannot restore: missing run or snapshot ID"); + if (!this.runFriendlyId || !this.snapshotManager) { + throw new Error("Cannot restore: missing run or snapshot manager"); } // Short delay to give websocket time to reconnect await sleep(100); // Process any env overrides - await this.processEnvOverrides(); + await this.processEnvOverrides("restore"); const continuationResult = await this.httpClient.continueRunExecution( this.runFriendlyId, - this.currentSnapshotId + this.snapshotManager.snapshotId ); if (!continuationResult.success) { @@ -872,24 +825,44 @@ export class RunExecution { this.restoreCount++; } + private async exitTaskRunProcessWithoutFailingRun({ flush }: { flush: boolean }) { + await this.taskRunProcess?.suspend({ flush }); + + // No services should be left running after this line - let's make sure of it + this.shutdown("exitTaskRunProcessWithoutFailingRun"); + } + /** * Processes env overrides from the metadata service. Generally called when we're resuming from a suspended state. */ - private async processEnvOverrides() { - if (!this.env.TRIGGER_METADATA_URL) { - this.sendDebugLog("No metadata URL, skipping env overrides"); - return; + public async processEnvOverrides(reason?: string): Promise<{ overrides: Metadata } | null> { + if (!this.metadataClient) { + return null; } - const metadataClient = new MetadataClient(this.env.TRIGGER_METADATA_URL); - const overrides = await metadataClient.getEnvOverrides(); + const [error, overrides] = await this.metadataClient.getEnvOverrides(); - if (!overrides) { - this.sendDebugLog("No env overrides, skipping"); - return; + if (error) { + this.sendDebugLog("[override] failed to fetch", { + reason, + error: error.message, + }); + return null; } - this.sendDebugLog("Processing env overrides", overrides); + if (overrides.TRIGGER_RUN_ID && overrides.TRIGGER_RUN_ID !== this.runFriendlyId) { + this.sendDebugLog("[override] run ID mismatch, ignoring overrides", { + reason, + currentRunId: this.runFriendlyId, + incomingRunId: overrides.TRIGGER_RUN_ID, + }); + return null; + } + + this.sendDebugLog(`[override] processing: ${reason}`, { + overrides, + currentEnv: this.env.raw, + }); // Override the env with the new values this.env.override(overrides); @@ -908,25 +881,32 @@ export class RunExecution { if (overrides.TRIGGER_RUNNER_ID) { this.httpClient.updateRunnerId(this.env.TRIGGER_RUNNER_ID); } + + return { + overrides, + }; } private async onHeartbeat() { if (!this.runFriendlyId) { - this.sendDebugLog("Heartbeat: missing run ID"); + this.sendDebugLog("heartbeat: missing run ID"); return; } - if (!this.currentSnapshotId) { - this.sendDebugLog("Heartbeat: missing snapshot ID"); + if (!this.snapshotManager) { + this.sendDebugLog("heartbeat: missing snapshot manager"); return; } - this.sendDebugLog("Heartbeat: started"); + this.sendDebugLog("heartbeat"); - const response = await this.httpClient.heartbeatRun(this.runFriendlyId, this.currentSnapshotId); + const response = await this.httpClient.heartbeatRun( + this.runFriendlyId, + this.snapshotManager.snapshotId + ); if (!response.success) { - this.sendDebugLog("Heartbeat: failed", { error: response.error }); + this.sendDebugLog("heartbeat: failed", { error: response.error }); } this.lastHeartbeat = new Date(); @@ -943,7 +923,27 @@ export class RunExecution { properties: { ...properties, runId: this.runFriendlyId, - snapshotId: this.currentSnapshotId, + snapshotId: this.currentSnapshotFriendlyId, + executionId: this.id, + executionRestoreCount: this.restoreCount, + lastHeartbeat: this.lastHeartbeat?.toISOString(), + }, + }); + } + + private sendRuntimeDebugLog( + message: string, + properties?: SendDebugLogOptions["properties"], + runIdOverride?: string + ) { + this.logger.sendDebugLog({ + runId: runIdOverride ?? this.runFriendlyId, + message: `[runtime] ${message}`, + print: false, + properties: { + ...properties, + runId: this.runFriendlyId, + snapshotId: this.currentSnapshotFriendlyId, executionId: this.id, executionRestoreCount: this.restoreCount, lastHeartbeat: this.lastHeartbeat?.toISOString(), @@ -951,6 +951,12 @@ export class RunExecution { }); } + private set suspendable(suspendable: boolean) { + this.snapshotManager?.setSuspendable(suspendable).catch((error) => { + this.sendDebugLog("failed to set suspendable", { error: error.message }); + }); + } + // Ensure we can only set this once private set runFriendlyId(id: string) { if (this._runFriendlyId) { @@ -965,7 +971,7 @@ export class RunExecution { } public get currentSnapshotFriendlyId(): string | undefined { - return this.currentSnapshotId; + return this.snapshotManager?.snapshotId; } public get taskRunEnv(): Record | undefined { @@ -974,7 +980,12 @@ export class RunExecution { public get metrics() { return { - restoreCount: this.restoreCount, + execution: { + restoreCount: this.restoreCount, + lastHeartbeat: this.lastHeartbeat, + }, + poller: this.snapshotPoller?.metrics, + notifier: this.notifier?.metrics, }; } @@ -984,21 +995,154 @@ export class RunExecution { private abortExecution() { if (this.isAborted) { - this.sendDebugLog("Execution already aborted"); + this.sendDebugLog("execution already aborted"); return; } this.executionAbortController.abort(); - this.stopServices(); + this.shutdown("abortExecution"); } - private stopServices() { + private shutdown(reason: string) { if (this.isShuttingDown) { + this.sendDebugLog(`[shutdown] ${reason} (already shutting down)`, { + firstShutdownReason: this.shutdownReason, + }); return; } + this.sendDebugLog(`[shutdown] ${reason}`); + this.isShuttingDown = true; + this.shutdownReason = reason; + this.snapshotPoller?.stop(); - this.taskRunProcess?.onTaskRunHeartbeat.detach(); + this.snapshotManager?.stop(); + this.notifier?.stop(); + + this.taskRunProcess?.unsafeDetachEvtHandlers(); + } + + private async handleSuspendable(suspendableSnapshot: SnapshotState) { + this.sendDebugLog("handleSuspendable", { suspendableSnapshot }); + + if (!this.snapshotManager) { + this.sendDebugLog("handleSuspendable: missing snapshot manager", { suspendableSnapshot }); + return; + } + + // Ensure this is the current snapshot + if (suspendableSnapshot.id !== this.currentSnapshotFriendlyId) { + this.sendDebugLog("snapshot changed before cleanup, abort", { + suspendableSnapshot, + currentSnapshotId: this.currentSnapshotFriendlyId, + }); + this.abortExecution(); + return; + } + + // First cleanup the task run process + const [error] = await tryCatch(this.taskRunProcess?.cleanup(false)); + + if (error) { + this.sendDebugLog("failed to cleanup task run process, carrying on", { + suspendableSnapshot, + error: error.message, + }); + } + + // Double check snapshot hasn't changed after cleanup + if (suspendableSnapshot.id !== this.currentSnapshotFriendlyId) { + this.sendDebugLog("snapshot changed after cleanup, abort", { + suspendableSnapshot, + currentSnapshotId: this.currentSnapshotFriendlyId, + }); + this.abortExecution(); + return; + } + + if (!this.runFriendlyId) { + this.sendDebugLog("missing run ID for suspension, abort", { suspendableSnapshot }); + this.abortExecution(); + return; + } + + // Call the suspend API with the current snapshot ID + const suspendResult = await this.httpClient.suspendRun( + this.runFriendlyId, + suspendableSnapshot.id + ); + + if (!suspendResult.success) { + this.sendDebugLog("suspension request failed, staying alive 🎶", { + suspendableSnapshot, + error: suspendResult.error, + }); + + // This is fine, we'll wait for the next status change + return; + } + + if (!suspendResult.data.ok) { + this.sendDebugLog("suspension request returned error, staying alive 🎶", { + suspendableSnapshot, + error: suspendResult.data.error, + }); + + // This is fine, we'll wait for the next status change + return; + } + + this.sendDebugLog("suspending, any day now 🚬", { suspendableSnapshot }); + } + + /** + * Fetches the latest execution data and enqueues snapshot changes. Used by both poller and notification handlers. + * @param source string - where this call originated (e.g. 'poller', 'notification') + */ + public async fetchAndProcessSnapshotChanges(source: string): Promise { + if (!this.runFriendlyId) { + this.sendDebugLog(`fetchAndProcessSnapshotChanges: missing runFriendlyId`, { source }); + return; + } + + // Use the last processed snapshot as the since parameter + const sinceSnapshotId = this.currentSnapshotFriendlyId; + + if (!sinceSnapshotId) { + this.sendDebugLog(`fetchAndProcessSnapshotChanges: missing sinceSnapshotId`, { source }); + return; + } + + const response = await this.httpClient.getSnapshotsSince(this.runFriendlyId, sinceSnapshotId); + + if (!response.success) { + this.sendDebugLog(`fetchAndProcessSnapshotChanges: failed to get snapshots since`, { + source, + error: response.error, + }); + + await this.processEnvOverrides("snapshots since error"); + return; + } + + const { snapshots } = response.data; + + if (!snapshots.length) { + return; + } + + const [error] = await tryCatch(this.enqueueSnapshotChangesAndWait(snapshots)); + + if (error) { + this.sendDebugLog( + `fetchAndProcessSnapshotChanges: failed to enqueue and process snapshot change`, + { + source, + error: error.message, + } + ); + return; + } } } diff --git a/packages/cli-v3/src/entryPoints/managed/logger.ts b/packages/cli-v3/src/entryPoints/managed/logger.ts index 3a7a045476..150d740094 100644 --- a/packages/cli-v3/src/entryPoints/managed/logger.ts +++ b/packages/cli-v3/src/entryPoints/managed/logger.ts @@ -1,31 +1,38 @@ import { + DebugLogPropertiesInput, WorkloadDebugLogRequestBody, WorkloadHttpClient, } from "@trigger.dev/core/v3/runEngineWorker"; import { RunnerEnv } from "./env.js"; +import { flattenAttributes } from "@trigger.dev/core/v3"; export type SendDebugLogOptions = { runId?: string; message: string; date?: Date; - properties?: WorkloadDebugLogRequestBody["properties"]; + properties?: DebugLogPropertiesInput; + print?: boolean; }; +export interface RunLogger { + sendDebugLog(options: SendDebugLogOptions): void; +} + export type RunLoggerOptions = { httpClient: WorkloadHttpClient; env: RunnerEnv; }; -export class RunLogger { +export class ManagedRunLogger implements RunLogger { private readonly httpClient: WorkloadHttpClient; private readonly env: RunnerEnv; - constructor(private readonly opts: RunLoggerOptions) { + constructor(opts: RunLoggerOptions) { this.httpClient = opts.httpClient; this.env = opts.env; } - sendDebugLog({ runId, message, date, properties }: SendDebugLogOptions) { + sendDebugLog({ runId, message, date, properties, print = true }: SendDebugLogOptions) { if (!runId) { runId = this.env.TRIGGER_RUN_ID; } @@ -41,12 +48,32 @@ export class RunLogger { workerName: this.env.TRIGGER_WORKER_INSTANCE_NAME, }; - console.log(message, mergedProperties); + if (print) { + console.log(message, mergedProperties); + } + + const flattenedProperties = flattenAttributes( + mergedProperties + ) satisfies WorkloadDebugLogRequestBody["properties"]; this.httpClient.sendDebugLog(runId, { message, time: date ?? new Date(), - properties: mergedProperties, + properties: flattenedProperties, }); } } + +export class ConsoleRunLogger implements RunLogger { + private readonly print: boolean; + + constructor(opts: { print?: boolean } = {}) { + this.print = opts.print ?? true; + } + + sendDebugLog({ message, properties }: SendDebugLogOptions): void { + if (this.print) { + console.log("[ConsoleLogger]", message, properties); + } + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/notifier.ts b/packages/cli-v3/src/entryPoints/managed/notifier.ts new file mode 100644 index 0000000000..6f23c1b861 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/notifier.ts @@ -0,0 +1,90 @@ +import type { SupervisorSocket } from "./controller.js"; +import type { RunLogger, SendDebugLogOptions } from "./logger.js"; + +type OnNotify = (source: string) => Promise; + +type RunNotifierOptions = { + runFriendlyId: string; + supervisorSocket: SupervisorSocket; + onNotify: OnNotify; + logger: RunLogger; +}; + +export class RunNotifier { + private runFriendlyId: string; + private socket: SupervisorSocket; + private onNotify: OnNotify; + private logger: RunLogger; + + private lastNotificationAt: Date | null = null; + private notificationCount = 0; + + private lastInvalidNotificationAt: Date | null = null; + private invalidNotificationCount = 0; + + constructor(opts: RunNotifierOptions) { + this.runFriendlyId = opts.runFriendlyId; + this.socket = opts.supervisorSocket; + this.onNotify = opts.onNotify; + this.logger = opts.logger; + } + + start(): RunNotifier { + this.sendDebugLog("start"); + + this.socket.on("run:notify", async ({ version, run }) => { + // Generate a unique ID for the notification + const notificationId = Math.random().toString(36).substring(2, 15); + + // Use this to track the notification incl. any processing + const notification = { + id: notificationId, + runId: run.friendlyId, + version, + }; + + if (run.friendlyId !== this.runFriendlyId) { + this.sendDebugLog("run:notify received invalid notification", { notification }); + + this.invalidNotificationCount++; + this.lastInvalidNotificationAt = new Date(); + + return; + } + + this.sendDebugLog("run:notify received by runner", { notification }); + + this.notificationCount++; + this.lastNotificationAt = new Date(); + + await this.onNotify(`notifier:${notificationId}`); + }); + + return this; + } + + stop() { + this.sendDebugLog("stop"); + this.socket.removeAllListeners("run:notify"); + } + + get metrics() { + return { + lastNotificationAt: this.lastNotificationAt, + notificationCount: this.notificationCount, + lastInvalidNotificationAt: this.lastInvalidNotificationAt, + invalidNotificationCount: this.invalidNotificationCount, + }; + } + + private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: `[notifier] ${message}`, + properties: { + ...properties, + ...this.metrics, + }, + }); + } +} diff --git a/packages/cli-v3/src/entryPoints/managed/overrides.ts b/packages/cli-v3/src/entryPoints/managed/overrides.ts index 872b5ad0b3..85c5607105 100644 --- a/packages/cli-v3/src/entryPoints/managed/overrides.ts +++ b/packages/cli-v3/src/entryPoints/managed/overrides.ts @@ -1,4 +1,6 @@ export type Metadata = { + TRIGGER_RUN_ID: string | undefined; + TRIGGER_SNAPSHOT_ID: string | undefined; TRIGGER_SUPERVISOR_API_PROTOCOL: string | undefined; TRIGGER_SUPERVISOR_API_DOMAIN: string | undefined; TRIGGER_SUPERVISOR_API_PORT: number | undefined; @@ -17,13 +19,17 @@ export class MetadataClient { this.url = new URL(url); } - async getEnvOverrides(): Promise { + async getEnvOverrides(): Promise<[error: Error, data: null] | [error: null, data: Metadata]> { try { const response = await fetch(new URL("/env", this.url)); - return response.json(); + + if (!response.ok) { + return [new Error(`Status ${response.status} ${response.statusText}`), null]; + } + + return [null, await response.json()]; } catch (error) { - console.error("Failed to fetch metadata", { error }); - return null; + return [error instanceof Error ? error : new Error(String(error)), null]; } } } diff --git a/packages/cli-v3/src/entryPoints/managed/poller.ts b/packages/cli-v3/src/entryPoints/managed/poller.ts index 814833846e..b26aa5ab21 100644 --- a/packages/cli-v3/src/entryPoints/managed/poller.ts +++ b/packages/cli-v3/src/entryPoints/managed/poller.ts @@ -1,14 +1,14 @@ -import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker"; import { RunLogger, SendDebugLogOptions } from "./logger.js"; -import { IntervalService, RunExecutionData } from "@trigger.dev/core/v3"; +import { IntervalService } from "@trigger.dev/core/v3"; + +type OnPoll = (source: string) => Promise; export type RunExecutionSnapshotPollerOptions = { runFriendlyId: string; snapshotFriendlyId: string; - httpClient: WorkloadHttpClient; logger: RunLogger; snapshotPollIntervalSeconds: number; - handleSnapshotChange: (execution: RunExecutionData) => Promise; + onPoll: OnPoll; }; export class RunExecutionSnapshotPoller { @@ -16,19 +16,20 @@ export class RunExecutionSnapshotPoller { private snapshotFriendlyId: string; private enabled: boolean; - private readonly httpClient: WorkloadHttpClient; private readonly logger: RunLogger; - private readonly handleSnapshotChange: (runData: RunExecutionData) => Promise; + private readonly onPoll: OnPoll; private readonly poller: IntervalService; + private lastPollAt: Date | null = null; + private pollCount = 0; + constructor(opts: RunExecutionSnapshotPollerOptions) { this.enabled = false; this.runFriendlyId = opts.runFriendlyId; this.snapshotFriendlyId = opts.snapshotFriendlyId; - this.httpClient = opts.httpClient; this.logger = opts.logger; - this.handleSnapshotChange = opts.handleSnapshotChange; + this.onPoll = opts.onPoll; const intervalMs = opts.snapshotPollIntervalSeconds * 1000; @@ -41,19 +42,10 @@ export class RunExecutionSnapshotPoller { this.sendDebugLog("polling for latest snapshot"); - const response = await this.httpClient.getRunExecutionData(this.runFriendlyId); - - if (!response.success) { - this.sendDebugLog("failed to get run execution data", { error: response.error }); - return; - } + this.lastPollAt = new Date(); + this.pollCount++; - if (!this.enabled) { - this.sendDebugLog("poller disabled, skipping snapshot change handler (post)"); - return; - } - - await this.handleSnapshotChange(response.data.execution); + await this.onPoll("poller"); }, intervalMs, leadingEdge: false, @@ -63,27 +55,13 @@ export class RunExecutionSnapshotPoller { }); }, }); - - this.sendDebugLog("created"); - } - - private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) { - this.logger.sendDebugLog({ - runId: this.runFriendlyId, - message: `[poller] ${message}`, - properties: { - ...properties, - runId: this.runFriendlyId, - snapshotId: this.snapshotFriendlyId, - pollIntervalMs: this.poller.intervalMs, - }, - }); } resetCurrentInterval() { this.poller.resetCurrentInterval(); } + // The snapshot ID is only used as an indicator of when a poller got stuck updateSnapshotId(snapshotFriendlyId: string) { this.snapshotFriendlyId = snapshotFriendlyId; } @@ -92,14 +70,18 @@ export class RunExecutionSnapshotPoller { this.poller.updateInterval(intervalMs); } - start() { + start(): RunExecutionSnapshotPoller { if (this.enabled) { this.sendDebugLog("already started"); - return; + return this; } + this.sendDebugLog("start"); + this.enabled = true; this.poller.start(); + + return this; } stop() { @@ -108,7 +90,34 @@ export class RunExecutionSnapshotPoller { return; } + this.sendDebugLog("stop"); + this.enabled = false; - this.poller.stop(); + + const { isExecuting } = this.poller.stop(); + + if (isExecuting) { + this.sendDebugLog("stopped while executing"); + } + } + + get metrics() { + return { + lastPollAt: this.lastPollAt, + pollCount: this.pollCount, + }; + } + + private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: `[poller] ${message}`, + properties: { + ...properties, + ...this.metrics, + snapshotId: this.snapshotFriendlyId, + pollIntervalMs: this.poller.intervalMs, + }, + }); } } diff --git a/packages/cli-v3/src/entryPoints/managed/snapshot.test.ts b/packages/cli-v3/src/entryPoints/managed/snapshot.test.ts new file mode 100644 index 0000000000..05cba11f38 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/snapshot.test.ts @@ -0,0 +1,735 @@ +import { SnapshotManager } from "./snapshot.js"; +import { ConsoleRunLogger } from "./logger.js"; +import { RunExecutionData, TaskRunExecutionStatus } from "@trigger.dev/core/v3"; +import { setTimeout } from "timers/promises"; +import { isCI } from "std-env"; + +describe("SnapshotManager", () => { + const mockLogger = new ConsoleRunLogger({ print: !isCI }); + const mockSnapshotHandler = vi.fn(); + const mockSuspendableHandler = vi.fn(); + + let manager: SnapshotManager; + + beforeEach(() => { + vi.clearAllMocks(); + manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: mockSnapshotHandler, + onSuspendable: mockSuspendableHandler, + }); + }); + + it("should initialize with correct initial values", () => { + expect(manager.snapshotId).toBe("snapshot-1"); + expect(manager.status).toBe("PENDING_EXECUTING"); + expect(manager.suspendable).toBe(false); + }); + + it("should update snapshot when newer snapshot ID is provided", () => { + manager.updateSnapshot("snapshot-2", "EXECUTING"); + expect(manager.snapshotId).toBe("snapshot-2"); + expect(manager.status).toBe("EXECUTING"); + }); + + it("should not update snapshot when older snapshot ID is provided", () => { + manager.updateSnapshot("snapshot-2", "EXECUTING"); + manager.updateSnapshot("snapshot-1", "FINISHED"); + expect(manager.snapshotId).toBe("snapshot-2"); + expect(manager.status).toBe("EXECUTING"); + }); + + it("should handle suspendable state changes", async () => { + await manager.setSuspendable(true); + expect(manager.suspendable).toBe(true); + expect(mockSuspendableHandler).not.toHaveBeenCalled(); + + // When status changes to EXECUTING_WITH_WAITPOINTS, suspendable handler should be called + await manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "snapshot-2", + executionStatus: "EXECUTING_WITH_WAITPOINTS", + }), + ]); + + expect(mockSuspendableHandler).toHaveBeenCalledWith({ + id: "snapshot-2", + status: "EXECUTING_WITH_WAITPOINTS", + }); + + // Reset mocks + vi.clearAllMocks(); + + // Test this the other way around + await manager.setSuspendable(false); + expect(manager.suspendable).toBe(false); + expect(mockSuspendableHandler).not.toHaveBeenCalled(); + + // We should still be EXECUTING_WITH_WAITPOINTS + expect(manager.status).toBe("EXECUTING_WITH_WAITPOINTS"); + + // When we're suspendable again, the handler should be called + await manager.setSuspendable(true); + expect(manager.suspendable).toBe(true); + expect(mockSuspendableHandler).toHaveBeenCalledWith({ + id: "snapshot-2", + status: "EXECUTING_WITH_WAITPOINTS", + }); + + // Reset mocks + vi.clearAllMocks(); + + // Check simple toggle + await manager.setSuspendable(false); + expect(manager.suspendable).toBe(false); + await manager.setSuspendable(true); + expect(manager.suspendable).toBe(true); + expect(mockSuspendableHandler).toHaveBeenCalledWith({ + id: "snapshot-2", + status: "EXECUTING_WITH_WAITPOINTS", + }); + + // Reset mocks + vi.clearAllMocks(); + + // Transitioning to QUEUED_EXECUTING should call the handler again + await manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "snapshot-3", + executionStatus: "QUEUED_EXECUTING", + }), + ]); + expect(mockSuspendableHandler).toHaveBeenCalledWith({ + id: "snapshot-3", + status: "QUEUED_EXECUTING", + }); + }); + + it("should process queue in correct order with suspendable changes at the back", async () => { + const executionOrder: string[] = []; + + // Create a manager with handlers that track execution order + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + executionOrder.push(`snapshot:${data.snapshot.friendlyId}`); + await setTimeout(10); // Small delay + }, + onSuspendable: async (state) => { + executionOrder.push(`suspendable:${state.id}`); + await setTimeout(10); // Small delay + }, + }); + + const promises = [ + manager.setSuspendable(false), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + manager.setSuspendable(true), + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "snapshot-3", + executionStatus: "EXECUTING_WITH_WAITPOINTS", + }), + ]), + ]; + + await Promise.all(promises); + + // Verify execution order: + // 1. Snapshots should be processed in order (2 then 3) + // 2. Suspendable changes should be at the end + expect(executionOrder).toEqual([ + "snapshot:snapshot-2", + "snapshot:snapshot-3", + "suspendable:snapshot-3", + ]); + }); + + it("should skip older snapshots", async () => { + const executionOrder: string[] = []; + + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + executionOrder.push(`snapshot:${data.snapshot.friendlyId}`); + }, + onSuspendable: async () => {}, + }); + + // Queue snapshots in reverse order + const promises = [ + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-3" })]), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-1" })]), + ]; + + await Promise.all(promises); + + // Should be processed in ID order + expect(executionOrder).toEqual(["snapshot:snapshot-3"]); + }); + + it("should skip duplicate snapshots", async () => { + const executionOrder: string[] = []; + + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + executionOrder.push(`snapshot:${data.snapshot.friendlyId}`); + }, + onSuspendable: async () => {}, + }); + + // Queue snapshots in reverse order + const promises = [ + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + ]; + + await Promise.all(promises); + + // Should be processed in ID order + expect(executionOrder).toEqual(["snapshot:snapshot-2"]); + }); + + it("should prevent concurrent handler execution", async () => { + const executionTimes: { start: number; end: number; type: string }[] = []; + let currentlyExecuting = false; + + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + if (currentlyExecuting) { + throw new Error("Handler executed while another handler was running"); + } + currentlyExecuting = true; + const start = Date.now(); + await setTimeout(20); // Deliberate delay to increase chance of catching concurrent execution + const end = Date.now(); + executionTimes.push({ start, end, type: `snapshot:${data.snapshot.friendlyId}` }); + currentlyExecuting = false; + }, + onSuspendable: async (state) => { + if (currentlyExecuting) { + throw new Error("Handler executed while another handler was running"); + } + currentlyExecuting = true; + const start = Date.now(); + await setTimeout(20); // Deliberate delay + const end = Date.now(); + executionTimes.push({ start, end, type: `suspendable:${state.id}` }); + currentlyExecuting = false; + }, + }); + + // Create a mix of rapid changes + const promises = [ + manager.setSuspendable(true), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + manager.setSuspendable(false), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-3" })]), + manager.setSuspendable(true), + manager.setSuspendable(true), + manager.setSuspendable(false), + manager.setSuspendable(false), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-4" })]), + manager.setSuspendable(false), + manager.setSuspendable(true), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-1" })]), + manager.setSuspendable(false), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-3" })]), + manager.setSuspendable(true), + manager.setSuspendable(true), + manager.setSuspendable(false), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + manager.setSuspendable(false), + ]; + + await Promise.all(promises); + + // Verify no overlapping execution times + for (let i = 1; i < executionTimes.length; i++) { + const previous = executionTimes[i - 1]!; + const current = executionTimes[i]!; + expect(current.start).toBeGreaterThanOrEqual(previous.end); + } + }); + + it("should handle cleanup and error scenarios", async () => { + const executionOrder: string[] = []; + let shouldThrowSnapshotError = false; + let shouldThrowSuspendableError = false; + + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + if (shouldThrowSnapshotError) { + throw new Error("Snapshot handler error"); + } + executionOrder.push(`snapshot:${data.snapshot.friendlyId}`); + await setTimeout(10); + }, + onSuspendable: async (state) => { + if (shouldThrowSuspendableError) { + throw new Error("Suspendable handler error"); + } + executionOrder.push(`suspendable:${state.id}`); + await setTimeout(10); + }, + }); + + // Queue up some changes + const initialPromises = [ + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-2" })]), + manager.setSuspendable(true), + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-3" })]), + ]; + + expect(manager.queueLength).not.toBe(0); + + // Dispose manager before any promises complete + manager.stop(); + + expect(manager.queueLength).toBe(0); + + // These should complete without executing handlers + const results = await Promise.allSettled(initialPromises); + + // Only the first snapshot should have been processed + expect(executionOrder).toEqual(["snapshot:snapshot-2"]); + + // The last two promises should have been rejected + expect(results).toMatchObject([ + { status: "fulfilled" }, + { status: "rejected" }, + { status: "rejected" }, + ]); + + // Now test error handling + shouldThrowSnapshotError = true; + await expect( + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "snapshot-4" })]) + ).rejects.toThrow("Snapshot handler error"); + + // Queue should continue processing after error + shouldThrowSnapshotError = false; + await manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "snapshot-5", + executionStatus: "EXECUTING_WITH_WAITPOINTS", + }), + ]); + expect(executionOrder).toEqual(["snapshot:snapshot-2", "snapshot:snapshot-5"]); + + // Test suspendable error + shouldThrowSuspendableError = true; + await expect(manager.setSuspendable(true)).rejects.toThrow("Suspendable handler error"); + + // Queue should continue processing after suspendable error + shouldThrowSuspendableError = false; + + // Toggle suspendable state to trigger handler + await manager.setSuspendable(false); + await manager.setSuspendable(true); + + expect(executionOrder).toEqual([ + "snapshot:snapshot-2", + "snapshot:snapshot-5", + "suspendable:snapshot-5", + ]); + }); + + it("should handle edge cases and high concurrency", async () => { + const executionOrder: string[] = []; + const executionTimes: { start: number; end: number; type: string }[] = []; + let currentlyExecuting = false; + let handlerExecutionCount = 0; + + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + if (currentlyExecuting) { + throw new Error("Handler executed while another handler was running"); + } + currentlyExecuting = true; + handlerExecutionCount++; + + const start = Date.now(); + executionOrder.push(`snapshot:${data.snapshot.friendlyId}`); + await setTimeout(Math.random() * 20); // Random delay to increase race condition chances + const end = Date.now(); + + executionTimes.push({ start, end, type: `snapshot:${data.snapshot.friendlyId}` }); + currentlyExecuting = false; + }, + onSuspendable: async (state) => { + if (currentlyExecuting) { + throw new Error("Handler executed while another handler was running"); + } + currentlyExecuting = true; + handlerExecutionCount++; + + const start = Date.now(); + executionOrder.push(`suspendable:${state.id}`); + await setTimeout(Math.random() * 20); // Random delay + const end = Date.now(); + + executionTimes.push({ start, end, type: `suspendable:${state.id}` }); + currentlyExecuting = false; + }, + }); + + // Test empty snapshot IDs + await manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: "" })]); + expect(executionOrder).toEqual([]); + + // Create a very long queue of mixed changes + const promises: Promise[] = []; + + // Add 50 mixed changes + for (let i = 1; i <= 50; i++) { + if (i % 2 === 0) { + promises.push( + manager.handleSnapshotChanges([createRunExecutionData({ snapshotId: `snapshot-${i}` })]) + ); + } else { + promises.push(manager.setSuspendable(i % 4 === 1)); + } + } + + // Add rapid toggling of suspendable state + for (let i = 0; i < 20; i++) { + promises.push(manager.setSuspendable(i % 2 === 0)); + } + + // Add overlapping snapshot changes + const snapshotIds = ["A", "B", "C", "D", "E"]; + for (const id of snapshotIds) { + for (let i = 0; i < 5; i++) { + promises.push( + manager.handleSnapshotChanges([ + createRunExecutionData({ snapshotId: `snapshot-${id}-${i}` }), + ]) + ); + } + } + + await Promise.all(promises); + + // Verify handler execution exclusivity + for (let i = 1; i < executionTimes.length; i++) { + const previous = executionTimes[i - 1]!; + const current = executionTimes[i]!; + expect(current.start).toBeGreaterThanOrEqual(previous.end); + } + + // Verify all handlers executed in sequence + expect(currentlyExecuting).toBe(false); + + // Verify suspendable state is correctly maintained + const finalSuspendableState = manager.suspendable; + const lastSuspendableChange = executionOrder + .filter((entry) => entry.startsWith("suspendable:")) + .pop(); + + // The last recorded suspendable change should match the final state + if (finalSuspendableState) { + expect(lastSuspendableChange).toBeDefined(); + } + + // Verify snapshot ordering + const snapshotExecutions = executionOrder + .filter((entry) => entry.startsWith("snapshot:")) + .map((entry) => entry.split(":")[1]); + + // Each snapshot should be greater than the previous one + for (let i = 1; i < snapshotExecutions.length; i++) { + expect(snapshotExecutions[i]! > snapshotExecutions[i - 1]!).toBe(true); + } + }); + + it("should handle queue processing and remaining edge cases", async () => { + const executionOrder: string[] = []; + let processingCount = 0; + + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + processingCount++; + executionOrder.push(`snapshot:${data.snapshot.friendlyId}`); + await setTimeout(10); + processingCount--; + }, + onSuspendable: async (state) => { + processingCount++; + executionOrder.push(`suspendable:${state.id}`); + await setTimeout(10); + processingCount--; + }, + }); + + // Test parallel queue processing prevention + const parallelPromises = Array.from({ length: 5 }, (_, i) => + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: `parallel-${i}`, + executionStatus: "EXECUTING", + }), + ]) + ); + + // Add some suspendable changes in the middle + parallelPromises.push(manager.setSuspendable(true)); + parallelPromises.push(manager.setSuspendable(false)); + + // Add more snapshot changes + parallelPromises.push( + ...Array.from({ length: 5 }, (_, i) => + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: `parallel-${i + 5}`, + executionStatus: "EXECUTING", + }), + ]) + ) + ); + + await Promise.all(parallelPromises); + + // Verify processingCount never exceeded 1 + expect(processingCount).toBe(0); + + // Test edge case: snapshot ID comparison with special characters + const specialCharPromises = [ + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "snapshot-1!", + executionStatus: "EXECUTING", + }), + ]), + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "snapshot-1@", + executionStatus: "EXECUTING", + }), + ]), + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "snapshot-1#", + executionStatus: "EXECUTING", + }), + ]), + ]; + + await Promise.all(specialCharPromises); + + // Test edge case: very long snapshot IDs + const longIdPromises = [ + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "a".repeat(1000), + executionStatus: "EXECUTING", + }), + ]), + manager.handleSnapshotChanges([ + createRunExecutionData({ + snapshotId: "b".repeat(1000), + executionStatus: "EXECUTING", + }), + ]), + ]; + + await Promise.all(longIdPromises); + + // Test edge case: rapid queue changes during processing + let isProcessing = false; + const rapidChangeManager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-2", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + onSnapshotChange: async (data) => { + if (isProcessing) { + throw new Error("Parallel processing detected"); + } + isProcessing = true; + await setTimeout(50); // Longer delay to test queue changes during processing + executionOrder.push(`rapid:${data.snapshot.friendlyId}`); + isProcessing = false; + }, + onSuspendable: async () => {}, + }); + + // Start processing a snapshot + const initialPromise = rapidChangeManager.handleSnapshotChanges([ + createRunExecutionData({ + runId: "test-run-2", + snapshotId: "snapshot-2", + executionStatus: "EXECUTING", + }), + ]); + + // Queue more changes while the first one is processing + await setTimeout(10); + const queuePromises = [ + rapidChangeManager.handleSnapshotChanges([ + createRunExecutionData({ + runId: "test-run-2", + snapshotId: "snapshot-3", + executionStatus: "EXECUTING", + }), + ]), + rapidChangeManager.handleSnapshotChanges([ + createRunExecutionData({ + runId: "test-run-2", + snapshotId: "snapshot-4", + executionStatus: "EXECUTING", + }), + ]), + ]; + + await Promise.all([initialPromise, ...queuePromises]); + + // Verify the rapid changes were processed in order + const rapidChanges = executionOrder.filter((entry) => entry.startsWith("rapid:")); + expect(rapidChanges).toEqual(["rapid:snapshot-2", "rapid:snapshot-3", "rapid:snapshot-4"]); + }); + + it("should detect restore and not deprecate restored runner", async () => { + // Mock MetadataClient + let runnerId = "test-runner-1"; + const mockMetadataClient = { + getEnvOverrides: vi.fn().mockImplementation(() => { + return Promise.resolve([null, { TRIGGER_RUNNER_ID: runnerId }]); + }), + }; + + const onSnapshotChange = vi.fn(); + const manager = new SnapshotManager({ + runnerId: "test-runner-1", + runFriendlyId: "test-run-1", + initialSnapshotId: "snapshot-1", + initialStatus: "PENDING_EXECUTING", + logger: mockLogger, + metadataClient: mockMetadataClient as any, + onSnapshotChange, + onSuspendable: async () => {}, + }); + + // Simulate some basic snapshot changes + await manager.handleSnapshotChanges([ + createRunExecutionData({ snapshotId: "snapshot-2", executionStatus: "EXECUTING" }), + createRunExecutionData({ + snapshotId: "snapshot-3", + executionStatus: "EXECUTING_WITH_WAITPOINTS", + }), + ]); + + // Should call onSnapshotChange with deprecated = false + expect(onSnapshotChange).toHaveBeenCalledWith( + expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-3" }) }), + false + ); + + // Reset the mock + onSnapshotChange.mockClear(); + + // Simulate a series of snapshot changes with deprecation markers and a restored runner + // (standard checkpoint / restore flow) + runnerId = "test-runner-2"; + await manager.handleSnapshotChanges([ + createRunExecutionData({ snapshotId: "snapshot-4", executionStatus: "SUSPENDED" }), + createRunExecutionData({ snapshotId: "snapshot-5", executionStatus: "QUEUED" }), + createRunExecutionData({ snapshotId: "snapshot-6", executionStatus: "EXECUTING" }), + ]); + + // Should call onSnapshotChange with deprecated = false + expect(onSnapshotChange).toHaveBeenCalledWith( + expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-6" }) }), + false + ); + + // Reset the mock + onSnapshotChange.mockClear(); + + // Simulate a new snapshot with a deprecation marker in previous snapshots, but no restore + await manager.handleSnapshotChanges([ + createRunExecutionData({ snapshotId: "snapshot-7", executionStatus: "QUEUED" }), + createRunExecutionData({ snapshotId: "snapshot-8", executionStatus: "EXECUTING" }), + ]); + // Should call onSnapshotChange with deprecated = true + expect(onSnapshotChange).toHaveBeenCalledWith( + expect.objectContaining({ snapshot: expect.objectContaining({ friendlyId: "snapshot-8" }) }), + true + ); + }); +}); + +// Helper to generate RunExecutionData with sensible defaults +function createRunExecutionData( + overrides: { + runId?: string; + runFriendlyId?: string; + snapshotId?: string; + snapshotFriendlyId?: string; + executionStatus?: TaskRunExecutionStatus; + description?: string; + } = {} +): RunExecutionData { + const runId = overrides.runId ?? "test-run-1"; + const runFriendlyId = overrides.runFriendlyId ?? runId; + const snapshotId = overrides.snapshotId ?? "snapshot-1"; + const snapshotFriendlyId = overrides.snapshotFriendlyId ?? snapshotId; + + return { + version: "1" as const, + run: { + id: runId, + friendlyId: runFriendlyId, + status: "EXECUTING", + attemptNumber: 1, + }, + snapshot: { + id: snapshotId, + friendlyId: snapshotFriendlyId, + executionStatus: overrides.executionStatus ?? "EXECUTING", + description: overrides.description ?? "Test snapshot", + createdAt: new Date(), + }, + completedWaitpoints: [], + }; +} diff --git a/packages/cli-v3/src/entryPoints/managed/snapshot.ts b/packages/cli-v3/src/entryPoints/managed/snapshot.ts new file mode 100644 index 0000000000..75d3d4b036 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/snapshot.ts @@ -0,0 +1,396 @@ +import { tryCatch } from "@trigger.dev/core/utils"; +import { RunLogger, SendDebugLogOptions } from "./logger.js"; +import { TaskRunExecutionStatus, type RunExecutionData } from "@trigger.dev/core/v3"; +import { assertExhaustive } from "@trigger.dev/core/utils"; +import { MetadataClient } from "./overrides.js"; + +export type SnapshotState = { + id: string; + status: TaskRunExecutionStatus; +}; + +type SnapshotHandler = (runData: RunExecutionData, deprecated: boolean) => Promise; +type SuspendableHandler = (suspendableSnapshot: SnapshotState) => Promise; + +type SnapshotManagerOptions = { + runFriendlyId: string; + runnerId: string; + initialSnapshotId: string; + initialStatus: TaskRunExecutionStatus; + logger: RunLogger; + metadataClient?: MetadataClient; + onSnapshotChange: SnapshotHandler; + onSuspendable: SuspendableHandler; +}; + +type QueuedChange = + | { id: string; type: "snapshot"; snapshots: RunExecutionData[] } + | { id: string; type: "suspendable"; value: boolean }; + +type QueuedChangeItem = { + change: QueuedChange; + resolve: () => void; + reject: (error: Error) => void; +}; + +export class SnapshotManager { + private runFriendlyId: string; + private runnerId: string; + + private logger: RunLogger; + private metadataClient?: MetadataClient; + + private state: SnapshotState; + private isSuspendable: boolean = false; + + private readonly onSnapshotChange: SnapshotHandler; + private readonly onSuspendable: SuspendableHandler; + + private changeQueue: QueuedChangeItem[] = []; + private isProcessingQueue = false; + + constructor(opts: SnapshotManagerOptions) { + this.runFriendlyId = opts.runFriendlyId; + this.runnerId = opts.runnerId; + + this.logger = opts.logger; + this.metadataClient = opts.metadataClient; + + this.state = { + id: opts.initialSnapshotId, + status: opts.initialStatus, + }; + + this.onSnapshotChange = opts.onSnapshotChange; + this.onSuspendable = opts.onSuspendable; + } + + public get snapshotId(): string { + return this.state.id; + } + + public get status(): TaskRunExecutionStatus { + return this.state.status; + } + + public get suspendable(): boolean { + return this.isSuspendable; + } + + public async setSuspendable(suspendable: boolean): Promise { + if (this.isSuspendable === suspendable) { + this.sendDebugLog(`skipping suspendable update, already ${suspendable}`); + return; + } + + this.sendDebugLog(`setting suspendable to ${suspendable}`); + + return this.enqueueSnapshotChange({ + id: crypto.randomUUID(), + type: "suspendable", + value: suspendable, + }); + } + + /** + * Update the snapshot ID and status without invoking any handlers + * + * @param snapshotId - The ID of the snapshot to update to + * @param status - The status to update to + */ + public updateSnapshot(snapshotId: string, status: TaskRunExecutionStatus) { + // Check if this is an old snapshot + if (snapshotId < this.state.id) { + this.sendDebugLog("skipping update for old snapshot", { + incomingId: snapshotId, + currentId: this.state.id, + }); + return; + } + + this.state = { id: snapshotId, status }; + } + + public async handleSnapshotChanges(snapshots: RunExecutionData[]): Promise { + if (!this.statusCheck(snapshots)) { + return; + } + + return this.enqueueSnapshotChange({ + id: crypto.randomUUID(), + type: "snapshot", + snapshots, + }); + } + + public get queueLength(): number { + return this.changeQueue.length; + } + + private statusCheck(snapshots: RunExecutionData[]): boolean { + const latestSnapshot = snapshots[snapshots.length - 1]; + + if (!latestSnapshot) { + this.sendDebugLog("skipping status check for empty snapshots", { + snapshots, + }); + return false; + } + + const { run, snapshot } = latestSnapshot; + + const statusCheckData = { + incomingId: snapshot.friendlyId, + incomingStatus: snapshot.executionStatus, + currentId: this.state.id, + currentStatus: this.state.status, + }; + + // Ensure run ID matches + if (run.friendlyId !== this.runFriendlyId) { + this.sendDebugLog("skipping update for mismatched run ID", { + statusCheckData, + }); + + return false; + } + + // Skip if this is an old snapshot + if (snapshot.friendlyId < this.state.id) { + this.sendDebugLog("skipping update for old snapshot", { + statusCheckData, + }); + + return false; + } + + // Skip if this is the current snapshot + if (snapshot.friendlyId === this.state.id) { + // DO NOT REMOVE (very noisy, but helpful for debugging) + // this.sendDebugLog("skipping update for duplicate snapshot", { + // statusCheckData, + // }); + + return false; + } + + return true; + } + + private async enqueueSnapshotChange(change: QueuedChange): Promise { + return new Promise((resolve, reject) => { + // For suspendable changes, resolve and remove any pending suspendable changes since only the last one matters + if (change.type === "suspendable") { + const pendingSuspendable = this.changeQueue.filter( + (item) => item.change.type === "suspendable" + ); + + // Resolve any pending suspendable changes - they're effectively done since we're superseding them + for (const item of pendingSuspendable) { + item.resolve(); + } + + // Remove the exact items we just resolved + const resolvedIds = new Set(pendingSuspendable.map((item) => item.change.id)); + this.changeQueue = this.changeQueue.filter((item) => !resolvedIds.has(item.change.id)); + } + + this.changeQueue.push({ change, resolve, reject }); + + // Sort queue: + // 1. Suspendable changes always go to the back + // 2. Snapshot changes are ordered by creation time, with the latest snapshot last + this.changeQueue.sort((a, b) => { + if (a.change.type === "suspendable" && b.change.type === "snapshot") { + return 1; // a goes after b + } + if (a.change.type === "snapshot" && b.change.type === "suspendable") { + return -1; // a goes before b + } + if (a.change.type === "snapshot" && b.change.type === "snapshot") { + const snapshotA = a.change.snapshots[a.change.snapshots.length - 1]; + const snapshotB = b.change.snapshots[b.change.snapshots.length - 1]; + + if (!snapshotA || !snapshotB) { + return 0; + } + + // Sort snapshot changes by creation time, old -> new + return snapshotA.snapshot.createdAt.getTime() - snapshotB.snapshot.createdAt.getTime(); + } + return 0; // both suspendable, maintain insertion order + }); + + // Start processing if not already running + this.processQueue().catch((error) => { + this.sendDebugLog("error processing queue", { error: error.message }); + }); + }); + } + + private async processQueue() { + if (this.isProcessingQueue) { + return; + } + + this.isProcessingQueue = true; + + try { + while (this.queueLength > 0) { + // Remove first item from queue + const item = this.changeQueue.shift(); + if (!item) { + break; + } + + const [error] = await tryCatch(this.applyChange(item.change)); + + // Resolve/reject promise + if (error) { + item.reject(error); + } else { + item.resolve(); + } + } + } finally { + const hasMoreItems = this.queueLength > 0; + this.isProcessingQueue = false; + + if (hasMoreItems) { + this.processQueue().catch((error) => { + this.sendDebugLog("error processing queue (finally)", { error: error.message }); + }); + } + } + } + + private async applyChange(change: QueuedChange): Promise { + switch (change.type) { + case "snapshot": { + const { snapshots } = change; + + // Double check we should process this snapshot + if (!this.statusCheck(snapshots)) { + return; + } + + const latestSnapshot = change.snapshots[change.snapshots.length - 1]; + if (!latestSnapshot) { + return; + } + + // These are the snapshots between the current and the latest one + const previousSnapshots = snapshots.slice(0, -1); + + // Check if any previous snapshot is QUEUED or SUSPENDED + const deprecatedStatus: TaskRunExecutionStatus[] = ["QUEUED", "SUSPENDED"]; + const deprecatedSnapshots = previousSnapshots.filter((snap) => + deprecatedStatus.includes(snap.snapshot.executionStatus) + ); + + let deprecated = false; + if (deprecatedSnapshots.length > 0) { + const hasBeenRestored = await this.hasBeenRestored(); + + if (hasBeenRestored) { + // It's normal for a restored run to have deprecation markers, e.g. it will have been SUSPENDED + deprecated = false; + } else { + deprecated = true; + } + } + + const { snapshot } = latestSnapshot; + const oldState = { ...this.state }; + + this.updateSnapshot(snapshot.friendlyId, snapshot.executionStatus); + + this.sendDebugLog(`status changed to ${snapshot.executionStatus}`, { + oldId: oldState.id, + newId: snapshot.friendlyId, + oldStatus: oldState.status, + newStatus: snapshot.executionStatus, + deprecated, + }); + + // Execute handler + await this.onSnapshotChange(latestSnapshot, deprecated); + + // Check suspendable state after snapshot change + await this.checkSuspendableState(); + break; + } + case "suspendable": { + this.isSuspendable = change.value; + + // Check suspendable state after suspendable change + await this.checkSuspendableState(); + break; + } + default: { + assertExhaustive(change); + } + } + } + + private async hasBeenRestored() { + if (!this.metadataClient) { + return false; + } + + const [error, overrides] = await this.metadataClient.getEnvOverrides(); + + if (error) { + return false; + } + + if (!overrides.TRIGGER_RUNNER_ID) { + return false; + } + + if (overrides.TRIGGER_RUNNER_ID === this.runnerId) { + return false; + } + + this.runnerId = overrides.TRIGGER_RUNNER_ID; + + return true; + } + + private async checkSuspendableState() { + if ( + this.isSuspendable && + (this.state.status === "EXECUTING_WITH_WAITPOINTS" || + this.state.status === "QUEUED_EXECUTING") + ) { + // DO NOT REMOVE (very noisy, but helpful for debugging) + // this.sendDebugLog("run is now suspendable, executing handler"); + await this.onSuspendable(this.state); + } + } + + public stop() { + this.sendDebugLog("stop"); + + // Clear any pending changes + for (const item of this.changeQueue) { + item.reject(new Error("SnapshotManager stopped")); + } + this.changeQueue = []; + } + + protected sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]) { + this.logger.sendDebugLog({ + runId: this.runFriendlyId, + message: `[snapshot] ${message}`, + properties: { + ...properties, + snapshotId: this.state.id, + status: this.state.status, + suspendable: this.isSuspendable, + queueLength: this.queueLength, + isProcessingQueue: this.isProcessingQueue, + }, + }); + } +} diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index f6e179195b..6821fbdfc9 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -9,6 +9,7 @@ import { TaskRunExecutionPayload, TaskRunExecutionResult, type TaskRunInternalError, + tryCatch, WorkerManifest, WorkerToExecutorMessageCatalog, } from "@trigger.dev/core/v3"; @@ -33,19 +34,15 @@ import { SuspendedProcessError, } from "@trigger.dev/core/v3/errors"; -export type OnWaitForDurationMessage = InferSocketMessageSchema< +export type OnSendDebugLogMessage = InferSocketMessageSchema< typeof ExecutorToWorkerMessageCatalog, - "WAIT_FOR_DURATION" + "SEND_DEBUG_LOG" >; -export type OnWaitForTaskMessage = InferSocketMessageSchema< - typeof ExecutorToWorkerMessageCatalog, - "WAIT_FOR_TASK" ->; -export type OnWaitForBatchMessage = InferSocketMessageSchema< + +export type OnSetSuspendableMessage = InferSocketMessageSchema< typeof ExecutorToWorkerMessageCatalog, - "WAIT_FOR_BATCH" + "SET_SUSPENDABLE" >; -export type OnWaitMessage = InferSocketMessageSchema; export type TaskRunProcessOptions = { workerManifest: WorkerManifest; @@ -82,11 +79,8 @@ export class TaskRunProcess { public onExit: Evt<{ code: number | null; signal: NodeJS.Signals | null; pid?: number }> = new Evt(); public onIsBeingKilled: Evt = new Evt(); - public onReadyToDispose: Evt = new Evt(); - - public onWaitForTask: Evt = new Evt(); - public onWaitForBatch: Evt = new Evt(); - public onWait: Evt = new Evt(); + public onSendDebugLog: Evt = new Evt(); + public onSetSuspendable: Evt = new Evt(); private _isPreparedForNextRun: boolean = false; private _isPreparedForNextAttempt: boolean = false; @@ -104,6 +98,14 @@ export class TaskRunProcess { return this._isPreparedForNextAttempt; } + unsafeDetachEvtHandlers() { + this.onExit.detach(); + this.onIsBeingKilled.detach(); + this.onSendDebugLog.detach(); + this.onSetSuspendable.detach(); + this.onTaskRunHeartbeat.detach(); + } + async cancel() { this._isPreparedForNextRun = false; this._isBeingCancelled = true; @@ -195,23 +197,18 @@ export class TaskRunProcess { resolver(result); }, - READY_TO_DISPOSE: async () => { - logger.debug(`task run process is ready to dispose`); - - this.onReadyToDispose.post(this); - }, TASK_HEARTBEAT: async (message) => { this.onTaskRunHeartbeat.post(message.id); }, - WAIT_FOR_TASK: async (message) => { - this.onWaitForTask.post(message); - }, - WAIT_FOR_BATCH: async (message) => { - this.onWaitForBatch.post(message); - }, UNCAUGHT_EXCEPTION: async (message) => { logger.debug("uncaught exception in task run process", { ...message }); }, + SEND_DEBUG_LOG: async (message) => { + this.onSendDebugLog.post(message); + }, + SET_SUSPENDABLE: async (message) => { + this.onSetSuspendable.post(message); + }, }, }); @@ -289,56 +286,6 @@ export class TaskRunProcess { return result; } - taskRunCompletedNotification(completion: TaskRunExecutionResult) { - if (!completion.ok && typeof completion.retry !== "undefined") { - logger.debug( - "Task run completed with error and wants to retry, won't send task run completed notification" - ); - return; - } - - if (!this._child?.connected || this._isBeingKilled || this._child.killed) { - logger.debug( - "Child process not connected or being killed, can't send task run completed notification" - ); - return; - } - - this._ipc?.send("TASK_RUN_COMPLETED_NOTIFICATION", { - version: "v2", - completion, - }); - } - - waitCompletedNotification() { - if (!this._child?.connected || this._isBeingKilled || this._child.killed) { - console.error( - "Child process not connected or being killed, can't send wait completed notification" - ); - return; - } - - this._ipc?.send("WAIT_COMPLETED_NOTIFICATION", {}); - } - - waitpointCreated(waitId: string, waitpointId: string) { - if (!this._child?.connected || this._isBeingKilled || this._child.killed) { - console.error( - "Child process not connected or being killed, can't send waitpoint created notification" - ); - return; - } - - this._ipc?.send("WAITPOINT_CREATED", { - wait: { - id: waitId, - }, - waitpoint: { - id: waitpointId, - }, - }); - } - waitpointCompleted(waitpoint: CompletedWaitpoint) { if (!this._child?.connected || this._isBeingKilled || this._child.killed) { console.error( @@ -347,9 +294,7 @@ export class TaskRunProcess { return; } - this._ipc?.send("WAITPOINT_COMPLETED", { - waitpoint, - }); + this._ipc?.send("RESOLVE_WAITPOINT", { waitpoint }); } async #handleExit(code: number | null, signal: NodeJS.Signals | null) { @@ -441,6 +386,7 @@ export class TaskRunProcess { this._stderr.push(errorLine); } + /** This will never throw. */ async kill(signal?: number | NodeJS.Signals, timeoutInMs?: number) { logger.debug(`killing task run process`, { signal, @@ -454,26 +400,35 @@ export class TaskRunProcess { this.onIsBeingKilled.post(this); - this._child?.kill(signal); + try { + this._child?.kill(signal); + } catch (error) { + logger.debug("kill: failed to kill child process", { error }); + } + + if (!timeoutInMs) { + return; + } + + const [error] = await tryCatch(killTimeout); - if (timeoutInMs) { - await killTimeout; + if (error) { + logger.debug("kill: failed to wait for child process to exit", { error }); } } - async suspend() { + async suspend({ flush }: { flush: boolean }) { this._isBeingSuspended = true; - await this.kill("SIGKILL"); - } - forceExit() { - try { - this._isBeingKilled = true; + if (flush) { + const [error] = await tryCatch(this.#flush()); - this._child?.kill("SIGKILL"); - } catch (error) { - logger.debug("forceExit: failed to kill child process", { error }); + if (error) { + console.error("Error flushing task run process", { error }); + } } + + await this.kill("SIGKILL"); } get isBeingKilled() { diff --git a/packages/cli-v3/tsconfig.json b/packages/cli-v3/tsconfig.json index bef5b58828..d09a1e841e 100644 --- a/packages/cli-v3/tsconfig.json +++ b/packages/cli-v3/tsconfig.json @@ -5,6 +5,9 @@ }, { "path": "./tsconfig.e2e.test.json" + }, + { + "path": "./tsconfig.test.json" } ] } diff --git a/packages/cli-v3/tsconfig.src.json b/packages/cli-v3/tsconfig.src.json index 3ebcb153c9..364e98ce28 100644 --- a/packages/cli-v3/tsconfig.src.json +++ b/packages/cli-v3/tsconfig.src.json @@ -1,6 +1,7 @@ { "extends": "../../.configs/tsconfig.base.json", "include": ["./src/**/*.ts"], + "exclude": ["./src/**/*.test.ts"], "compilerOptions": { "isolatedDeclarations": false, "composite": true, diff --git a/packages/cli-v3/tsconfig.test.json b/packages/cli-v3/tsconfig.test.json new file mode 100644 index 0000000000..52b05f9281 --- /dev/null +++ b/packages/cli-v3/tsconfig.test.json @@ -0,0 +1,11 @@ +{ + "extends": "../../.configs/tsconfig.base.json", + "include": ["./src/**/*.ts"], + "references": [{ "path": "./tsconfig.src.json" }], + "compilerOptions": { + "isolatedDeclarations": false, + "composite": true, + "customConditions": ["@triggerdotdev/source"], + "types": ["vitest/globals"] + } +} diff --git a/packages/core/src/v3/runEngineWorker/supervisor/http.ts b/packages/core/src/v3/runEngineWorker/supervisor/http.ts index 4f899e4f22..43305b456a 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/http.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/http.ts @@ -17,6 +17,7 @@ import { WorkerApiDebugLogBody, WorkerApiSuspendRunRequestBody, WorkerApiSuspendRunResponseBody, + WorkerApiRunSnapshotsSinceResponseBody, } from "./schemas.js"; import { SupervisorClientCommonOptions } from "./types.js"; import { getDefaultWorkerHeaders } from "./util.js"; @@ -185,6 +186,20 @@ export class SupervisorHttpClient { ); } + async getSnapshotsSince(runId: string, snapshotId: string, runnerId?: string) { + return wrapZodFetch( + WorkerApiRunSnapshotsSinceResponseBody, + `${this.apiUrl}/engine/v1/worker-actions/runs/${runId}/snapshots/since/${snapshotId}`, + { + method: "GET", + headers: { + ...this.defaultHeaders, + ...this.runnerIdHeader(runnerId), + }, + } + ); + } + async sendDebugLog(runId: string, body: WorkerApiDebugLogBody, runnerId?: string): Promise { try { const res = await wrapZodFetch( diff --git a/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts b/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts index abae1e28b3..a49fd53a09 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/schemas.ts @@ -126,21 +126,32 @@ export type WorkerApiDequeueFromVersionResponseBody = z.infer< typeof WorkerApiDequeueFromVersionResponseBody >; -const AttributeValue = z.union([ +export const DebugLogPropertiesValue = z.union([ z.string(), z.number(), z.boolean(), - z.array(z.string().nullable()), - z.array(z.number().nullable()), - z.array(z.boolean().nullable()), + z.array(z.string().nullish()), + z.array(z.number().nullish()), + z.array(z.boolean().nullish()), ]); -const Attributes = z.record(z.string(), AttributeValue.optional()); +export const DebugLogProperties = z.record(z.string(), DebugLogPropertiesValue.optional()); +export type DebugLogProperties = z.infer; + +export const DebugLogPropertiesInput = z.record(z.string(), z.unknown()); +export type DebugLogPropertiesInput = z.infer; + +export const WorkerApiDebugLogBodyInput = z.object({ + time: z.coerce.date(), + message: z.string(), + properties: DebugLogPropertiesInput.optional(), +}); +export type WorkerApiDebugLogBodyInput = z.infer; export const WorkerApiDebugLogBody = z.object({ time: z.coerce.date(), message: z.string(), - properties: Attributes.optional(), + properties: DebugLogProperties.optional(), }); export type WorkerApiDebugLogBody = z.infer; @@ -151,3 +162,10 @@ export const WorkerApiSuspendCompletionResponseBody = z.object({ export type WorkerApiSuspendCompletionResponseBody = z.infer< typeof WorkerApiSuspendCompletionResponseBody >; + +export const WorkerApiRunSnapshotsSinceResponseBody = z.object({ + snapshots: z.array(RunExecutionData), +}); +export type WorkerApiRunSnapshotsSinceResponseBody = z.infer< + typeof WorkerApiRunSnapshotsSinceResponseBody +>; diff --git a/packages/core/src/v3/runEngineWorker/workload/http.ts b/packages/core/src/v3/runEngineWorker/workload/http.ts index 9dde07d35d..22978be81b 100644 --- a/packages/core/src/v3/runEngineWorker/workload/http.ts +++ b/packages/core/src/v3/runEngineWorker/workload/http.ts @@ -11,6 +11,7 @@ import { WorkloadSuspendRunResponseBody, WorkloadContinueRunExecutionResponseBody, WorkloadDebugLogRequestBody, + WorkloadRunSnapshotsSinceResponseBody, } from "./schemas.js"; import { WorkloadClientCommonOptions } from "./types.js"; import { getDefaultWorkloadHeaders } from "./util.js"; @@ -142,6 +143,19 @@ export class WorkloadHttpClient { ); } + async getSnapshotsSince(runId: string, snapshotId: string) { + return wrapZodFetch( + WorkloadRunSnapshotsSinceResponseBody, + `${this.apiUrl}/api/v1/workload-actions/runs/${runId}/snapshots/since/${snapshotId}`, + { + method: "GET", + headers: { + ...this.defaultHeaders(), + }, + } + ); + } + async sendDebugLog(runId: string, body: WorkloadDebugLogRequestBody): Promise { try { const res = await wrapZodFetch( diff --git a/packages/core/src/v3/runEngineWorker/workload/schemas.ts b/packages/core/src/v3/runEngineWorker/workload/schemas.ts index 93c4252d37..14f3b8efe3 100644 --- a/packages/core/src/v3/runEngineWorker/workload/schemas.ts +++ b/packages/core/src/v3/runEngineWorker/workload/schemas.ts @@ -10,6 +10,7 @@ import { WorkerApiDequeueFromVersionResponseBody, WorkerApiContinueRunExecutionRequestBody, WorkerApiDebugLogBody, + WorkerApiRunSnapshotsSinceResponseBody, } from "../supervisor/schemas.js"; export const WorkloadHeartbeatRequestBody = WorkerApiRunHeartbeatRequestBody; @@ -64,3 +65,8 @@ export const WorkloadDequeueFromVersionResponseBody = WorkerApiDequeueFromVersio export type WorkloadDequeueFromVersionResponseBody = z.infer< typeof WorkloadDequeueFromVersionResponseBody >; + +export const WorkloadRunSnapshotsSinceResponseBody = WorkerApiRunSnapshotsSinceResponseBody; +export type WorkloadRunSnapshotsSinceResponseBody = z.infer< + typeof WorkloadRunSnapshotsSinceResponseBody +>; diff --git a/packages/core/src/v3/runtime/managedRuntimeManager.ts b/packages/core/src/v3/runtime/managedRuntimeManager.ts deleted file mode 100644 index d23f800d76..0000000000 --- a/packages/core/src/v3/runtime/managedRuntimeManager.ts +++ /dev/null @@ -1,235 +0,0 @@ -import { clock } from "../clock-api.js"; -import { lifecycleHooks } from "../lifecycle-hooks-api.js"; -import { - BatchTaskRunExecutionResult, - CompletedWaitpoint, - TaskRunContext, - TaskRunExecutionResult, - TaskRunFailedExecutionResult, - TaskRunSuccessfulExecutionResult, - WaitpointTokenResult, -} from "../schemas/index.js"; -import { ExecutorToWorkerProcessConnection } from "../zodIpc.js"; -import { RuntimeManager } from "./manager.js"; -import { preventMultipleWaits } from "./preventMultipleWaits.js"; - -type Resolver = (value: CompletedWaitpoint) => void; - -export class ManagedRuntimeManager implements RuntimeManager { - // Maps a resolver ID to a resolver function - private readonly resolversByWaitId: Map = new Map(); - // Maps a waitpoint ID to a wait ID - private readonly resolversByWaitpoint: Map = new Map(); - - private _preventMultipleWaits = preventMultipleWaits(); - - constructor( - private ipc: ExecutorToWorkerProcessConnection, - private showLogs: boolean - ) { - // Log out the runtime status on a long interval to help debug stuck executions - setInterval(() => { - this.log("[DEBUG] ManagedRuntimeManager status", this.status); - }, 300_000); - } - - disable(): void { - // do nothing - } - - async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { - return this._preventMultipleWaits(async () => { - const promise = new Promise((resolve) => { - this.resolversByWaitId.set(params.id, resolve); - }); - - await lifecycleHooks.callOnWaitHookListeners({ - type: "task", - runId: params.id, - }); - - const waitpoint = await promise; - const result = this.waitpointToTaskRunExecutionResult(waitpoint); - - await lifecycleHooks.callOnResumeHookListeners({ - type: "task", - runId: params.id, - }); - - return result; - }); - } - - async waitForBatch(params: { - id: string; - runCount: number; - ctx: TaskRunContext; - }): Promise { - return this._preventMultipleWaits(async () => { - if (!params.runCount) { - return Promise.resolve({ id: params.id, items: [] }); - } - - const promise = Promise.all( - Array.from({ length: params.runCount }, (_, index) => { - const resolverId = `${params.id}_${index}`; - return new Promise((resolve, reject) => { - this.resolversByWaitId.set(resolverId, resolve); - }); - }) - ); - - await lifecycleHooks.callOnWaitHookListeners({ - type: "batch", - batchId: params.id, - runCount: params.runCount, - }); - - const waitpoints = await promise; - - await lifecycleHooks.callOnResumeHookListeners({ - type: "batch", - batchId: params.id, - runCount: params.runCount, - }); - - return { - id: params.id, - items: waitpoints.map(this.waitpointToTaskRunExecutionResult), - }; - }); - } - - async waitForWaitpoint({ - waitpointFriendlyId, - finishDate, - }: { - waitpointFriendlyId: string; - finishDate?: Date; - }): Promise { - return this._preventMultipleWaits(async () => { - const promise = new Promise((resolve) => { - this.resolversByWaitId.set(waitpointFriendlyId, resolve); - }); - - if (finishDate) { - await lifecycleHooks.callOnWaitHookListeners({ - type: "duration", - date: finishDate, - }); - } else { - await lifecycleHooks.callOnWaitHookListeners({ - type: "token", - token: waitpointFriendlyId, - }); - } - - const waitpoint = await promise; - - if (finishDate) { - await lifecycleHooks.callOnResumeHookListeners({ - type: "duration", - date: finishDate, - }); - } else { - await lifecycleHooks.callOnResumeHookListeners({ - type: "token", - token: waitpointFriendlyId, - }); - } - - return { - ok: !waitpoint.outputIsError, - output: waitpoint.output, - outputType: waitpoint.outputType, - }; - }); - } - - associateWaitWithWaitpoint(waitId: string, waitpointId: string) { - this.resolversByWaitpoint.set(waitpointId, waitId); - } - - async completeWaitpoints(waitpoints: CompletedWaitpoint[]): Promise { - await Promise.all(waitpoints.map((waitpoint) => this.completeWaitpoint(waitpoint))); - } - - private completeWaitpoint(waitpoint: CompletedWaitpoint): void { - this.log("completeWaitpoint", waitpoint); - - let waitId: string | undefined; - - if (waitpoint.completedByTaskRun) { - if (waitpoint.completedByTaskRun.batch) { - waitId = `${waitpoint.completedByTaskRun.batch.friendlyId}_${waitpoint.index}`; - } else { - waitId = waitpoint.completedByTaskRun.friendlyId; - } - } else if (waitpoint.completedByBatch) { - //no waitpoint resolves associated with batch completions - //a batch completion isn't when all the runs from a batch are completed - return; - } else if (waitpoint.type === "MANUAL" || waitpoint.type === "DATETIME") { - waitId = waitpoint.friendlyId; - } else { - waitId = this.resolversByWaitpoint.get(waitpoint.id); - } - - if (!waitId) { - this.log("No waitId found for waitpoint", { ...this.status, ...waitpoint }); - return; - } - - const resolve = this.resolversByWaitId.get(waitId); - - if (!resolve) { - this.log("No resolver found for waitId", { ...this.status, waitId }); - return; - } - - this.log("Resolving waitpoint", waitpoint); - - // Ensure current time is accurate before resolving the waitpoint - clock.reset(); - - resolve(waitpoint); - - this.resolversByWaitId.delete(waitId); - } - - private waitpointToTaskRunExecutionResult(waitpoint: CompletedWaitpoint): TaskRunExecutionResult { - if (!waitpoint.completedByTaskRun?.friendlyId) throw new Error("Missing completedByTaskRun"); - - if (waitpoint.outputIsError) { - return { - ok: false, - id: waitpoint.completedByTaskRun.friendlyId, - error: waitpoint.output - ? JSON.parse(waitpoint.output) - : { - type: "STRING_ERROR", - message: "Missing error output", - }, - } satisfies TaskRunFailedExecutionResult; - } else { - return { - ok: true, - id: waitpoint.completedByTaskRun.friendlyId, - output: waitpoint.output, - outputType: waitpoint.outputType ?? "application/json", - } satisfies TaskRunSuccessfulExecutionResult; - } - } - - private log(message: string, ...args: any[]) { - if (!this.showLogs) return; - console.log(`[${new Date().toISOString()}] ${message}`, args); - } - - private get status() { - return { - resolversbyWaitId: Array.from(this.resolversByWaitId.keys()), - resolversByWaitpoint: Array.from(this.resolversByWaitpoint.keys()), - }; - } -} diff --git a/packages/core/src/v3/runtime/sharedRuntimeManager.ts b/packages/core/src/v3/runtime/sharedRuntimeManager.ts new file mode 100644 index 0000000000..a513e3a1d6 --- /dev/null +++ b/packages/core/src/v3/runtime/sharedRuntimeManager.ts @@ -0,0 +1,349 @@ +import { assertExhaustive } from "../../utils.js"; +import { clock } from "../clock-api.js"; +import { lifecycleHooks } from "../lifecycle-hooks-api.js"; +import { DebugLogPropertiesInput } from "../runEngineWorker/index.js"; +import { + BatchTaskRunExecutionResult, + CompletedWaitpoint, + TaskRunContext, + TaskRunExecutionResult, + TaskRunFailedExecutionResult, + TaskRunSuccessfulExecutionResult, + WaitpointTokenResult, +} from "../schemas/index.js"; +import { tryCatch } from "../tryCatch.js"; +import { ExecutorToWorkerProcessConnection } from "../zodIpc.js"; +import { RuntimeManager } from "./manager.js"; +import { preventMultipleWaits } from "./preventMultipleWaits.js"; + +/** A function that resolves a waitpoint */ +type Resolver = (value: CompletedWaitpoint) => void; + +/** Branded type for resolver IDs to keep us from doing anything stupid */ +type ResolverId = string & { readonly __brand: unique symbol }; + +export class SharedRuntimeManager implements RuntimeManager { + /** Maps a resolver ID to a resolver function */ + private readonly resolversById = new Map(); + + /** Stores waitpoints that arrive before their resolvers have been created */ + private readonly waitpointsByResolverId = new Map(); + + private _preventMultipleWaits = preventMultipleWaits(); + + constructor( + private ipc: ExecutorToWorkerProcessConnection, + private showLogs: boolean + ) { + // Log out the runtime status on a long interval to help debug stuck executions + setInterval(() => { + this.debugLog("SharedRuntimeManager status"); + }, 300_000); + } + + disable(): void { + // do nothing + } + + async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise { + return this._preventMultipleWaits(async () => { + const promise = new Promise((resolve) => { + this.resolversById.set(params.id as ResolverId, resolve); + }); + + // Resolve any waitpoints we received before the resolver was created + this.resolvePendingWaitpoints(); + + await lifecycleHooks.callOnWaitHookListeners({ + type: "task", + runId: params.id, + }); + + const waitpoint = await this.suspendable(promise); + const result = this.waitpointToTaskRunExecutionResult(waitpoint); + + await lifecycleHooks.callOnResumeHookListeners({ + type: "task", + runId: params.id, + }); + + return result; + }); + } + + async waitForBatch(params: { + id: string; + runCount: number; + ctx: TaskRunContext; + }): Promise { + return this._preventMultipleWaits(async () => { + if (!params.runCount) { + return Promise.resolve({ id: params.id, items: [] }); + } + + const promises = Array.from({ length: params.runCount }, (_, index) => { + const resolverId = `${params.id}_${index}` as ResolverId; + + return new Promise((resolve, reject) => { + this.resolversById.set(resolverId, resolve); + }); + }); + + // Resolve any waitpoints we received before the resolvers were created + this.resolvePendingWaitpoints(); + + await lifecycleHooks.callOnWaitHookListeners({ + type: "batch", + batchId: params.id, + runCount: params.runCount, + }); + + const waitpoints = await this.suspendable(Promise.all(promises)); + + await lifecycleHooks.callOnResumeHookListeners({ + type: "batch", + batchId: params.id, + runCount: params.runCount, + }); + + return { + id: params.id, + items: waitpoints.map(this.waitpointToTaskRunExecutionResult), + }; + }); + } + + async waitForWaitpoint({ + waitpointFriendlyId, + finishDate, + }: { + waitpointFriendlyId: string; + finishDate?: Date; + }): Promise { + return this._preventMultipleWaits(async () => { + const promise = new Promise((resolve) => { + this.resolversById.set(waitpointFriendlyId as ResolverId, resolve); + }); + + // Resolve any waitpoints we received before the resolver was created + this.resolvePendingWaitpoints(); + + if (finishDate) { + await lifecycleHooks.callOnWaitHookListeners({ + type: "duration", + date: finishDate, + }); + } else { + await lifecycleHooks.callOnWaitHookListeners({ + type: "token", + token: waitpointFriendlyId, + }); + } + + const waitpoint = await this.suspendable(promise); + + if (finishDate) { + await lifecycleHooks.callOnResumeHookListeners({ + type: "duration", + date: finishDate, + }); + } else { + await lifecycleHooks.callOnResumeHookListeners({ + type: "token", + token: waitpointFriendlyId, + }); + } + + return { + ok: !waitpoint.outputIsError, + output: waitpoint.output, + outputType: waitpoint.outputType, + }; + }); + } + + async resolveWaitpoints(waitpoints: CompletedWaitpoint[]): Promise { + await Promise.all(waitpoints.map((waitpoint) => this.resolveWaitpoint(waitpoint))); + } + + private resolverIdFromWaitpoint(waitpoint: CompletedWaitpoint): ResolverId | null { + let id: string; + + switch (waitpoint.type) { + case "RUN": { + if (!waitpoint.completedByTaskRun) { + this.debugLog("no completedByTaskRun for RUN waitpoint", { + waitpoint: this.waitpointForDebugLog(waitpoint), + }); + return null; + } + + if (waitpoint.completedByTaskRun.batch) { + // This run is part of a batch + id = `${waitpoint.completedByTaskRun.batch.friendlyId}_${waitpoint.index}`; + } else { + // This run is NOT part of a batch + id = waitpoint.completedByTaskRun.friendlyId; + } + + break; + } + case "BATCH": { + if (!waitpoint.completedByBatch) { + this.debugLog("no completedByBatch for BATCH waitpoint", { + waitpoint: this.waitpointForDebugLog(waitpoint), + }); + return null; + } + + id = waitpoint.completedByBatch.friendlyId; + break; + } + case "MANUAL": + case "DATETIME": { + id = waitpoint.friendlyId; + break; + } + default: { + assertExhaustive(waitpoint.type); + } + } + + return id as ResolverId; + } + + private resolveWaitpoint(waitpoint: CompletedWaitpoint, resolverId?: ResolverId | null): void { + // This is spammy, don't make this a debug log + this.log("resolveWaitpoint", waitpoint); + + if (waitpoint.type === "BATCH") { + // We currently ignore these, they're not required to resume after a batch completes + this.debugLog("ignoring BATCH waitpoint", { + waitpoint: this.waitpointForDebugLog(waitpoint), + }); + return; + } + + resolverId = resolverId ?? this.resolverIdFromWaitpoint(waitpoint); + + if (!resolverId) { + this.debugLog("no resolverId for waitpoint", { + waitpoint: this.waitpointForDebugLog(waitpoint), + }); + + // No need to store the waitpoint, we'll never be able to resolve it + return; + } + + const resolve = this.resolversById.get(resolverId); + + if (!resolve) { + this.debugLog("no resolver found for resolverId", { + resolverId, + waitpoint: this.waitpointForDebugLog(waitpoint), + }); + + // Store the waitpoint for later if we can't find a resolver + this.waitpointsByResolverId.set(resolverId, waitpoint); + + return; + } + + // Ensure current time is accurate before resolving the waitpoint + clock.reset(); + + resolve(waitpoint); + + this.resolversById.delete(resolverId); + this.waitpointsByResolverId.delete(resolverId); + } + + private resolvePendingWaitpoints(): void { + for (const [resolverId, waitpoint] of this.waitpointsByResolverId.entries()) { + this.resolveWaitpoint(waitpoint, resolverId); + } + } + + private setSuspendable(suspendable: boolean): void { + this.ipc.send("SET_SUSPENDABLE", { suspendable }); + } + + private async suspendable(promise: Promise): Promise { + this.setSuspendable(true); + const [error, result] = await tryCatch(promise); + this.setSuspendable(false); + + if (error) { + this.debugLog("error in suspendable wrapper", { error: String(error) }); + throw error; + } + + return result; + } + + private waitpointToTaskRunExecutionResult(waitpoint: CompletedWaitpoint): TaskRunExecutionResult { + if (!waitpoint.completedByTaskRun?.friendlyId) throw new Error("Missing completedByTaskRun"); + + if (waitpoint.outputIsError) { + return { + ok: false, + id: waitpoint.completedByTaskRun.friendlyId, + error: waitpoint.output + ? JSON.parse(waitpoint.output) + : { + type: "STRING_ERROR", + message: "Missing error output", + }, + } satisfies TaskRunFailedExecutionResult; + } else { + return { + ok: true, + id: waitpoint.completedByTaskRun.friendlyId, + output: waitpoint.output, + outputType: waitpoint.outputType ?? "application/json", + } satisfies TaskRunSuccessfulExecutionResult; + } + } + + private waitpointForDebugLog(waitpoint: CompletedWaitpoint): DebugLogPropertiesInput { + const { completedAfter, completedAt, output, ...rest } = waitpoint; + + return { + ...rest, + output: output?.slice(0, 100), + completedAfter: completedAfter?.toISOString(), + completedAt: completedAt?.toISOString(), + completedAfterDate: completedAfter, + completedAtDate: completedAt, + }; + } + + private debugLog(message: string, properties?: DebugLogPropertiesInput) { + if (this.showLogs) { + console.log(`[${new Date().toISOString()}] ${message}`, { + runtimeStatus: this.status, + ...properties, + }); + } + + this.ipc.send("SEND_DEBUG_LOG", { + message, + properties: { + runtimeStatus: this.status, + ...properties, + }, + }); + } + + private log(message: string, ...args: any[]) { + if (!this.showLogs) return; + console.log(`[${new Date().toISOString()}] ${message}`, args); + } + + private get status() { + return { + resolversById: Array.from(this.resolversById.keys()), + waitpointsByResolverId: Array.from(this.waitpointsByResolverId.keys()), + }; + } +} diff --git a/packages/core/src/v3/schemas/messages.ts b/packages/core/src/v3/schemas/messages.ts index 2bbfed7aa2..6c813b357e 100644 --- a/packages/core/src/v3/schemas/messages.ts +++ b/packages/core/src/v3/schemas/messages.ts @@ -13,12 +13,12 @@ import { ProdTaskRunExecution, ProdTaskRunExecutionPayload, RunEngineVersionSchema, - RuntimeWait, TaskRunExecutionLazyAttemptPayload, TaskRunExecutionMetrics, WaitReason, } from "./schemas.js"; import { CompletedWaitpoint } from "./runEngine.js"; +import { DebugLogPropertiesInput } from "../runEngineWorker/index.js"; export const AckCallbackResult = z.discriminatedUnion("success", [ z.object({ @@ -172,37 +172,20 @@ export const ExecutorToWorkerMessageCatalog = { id: z.string(), }), }, - READY_TO_DISPOSE: { - message: z.undefined(), - }, - WAIT_FOR_DURATION: { - message: z.object({ - version: z.literal("v1").default("v1"), - ms: z.number(), - now: z.number(), - waitThresholdInMs: z.number(), - }), + UNCAUGHT_EXCEPTION: { + message: UncaughtExceptionMessage, }, - WAIT_FOR_TASK: { + SEND_DEBUG_LOG: { message: z.object({ version: z.literal("v1").default("v1"), - friendlyId: z.string(), + message: z.string(), + properties: DebugLogPropertiesInput.optional(), }), }, - WAIT_FOR_BATCH: { + SET_SUSPENDABLE: { message: z.object({ version: z.literal("v1").default("v1"), - batchFriendlyId: z.string(), - runFriendlyIds: z.string().array(), - }), - }, - UNCAUGHT_EXCEPTION: { - message: UncaughtExceptionMessage, - }, - WAIT: { - message: z.object({ - version: z.literal("v1").default("v1"), - wait: RuntimeWait, + suspendable: z.boolean(), }), }, }; @@ -219,24 +202,6 @@ export const WorkerToExecutorMessageCatalog = { isWarmStart: z.boolean().optional(), }), }, - TASK_RUN_COMPLETED_NOTIFICATION: { - message: z.discriminatedUnion("version", [ - z.object({ - version: z.literal("v1"), - completion: TaskRunExecutionResult, - execution: TaskRunExecution, - }), - z.object({ - version: z.literal("v2"), - completion: TaskRunExecutionResult, - }), - ]), - }, - WAIT_COMPLETED_NOTIFICATION: { - message: z.object({ - version: z.literal("v1").default("v1"), - }), - }, FLUSH: { message: z.object({ timeoutInMs: z.number(), @@ -249,18 +214,7 @@ export const WorkerToExecutorMessageCatalog = { }), callback: z.void(), }, - WAITPOINT_CREATED: { - message: z.object({ - version: z.literal("v1").default("v1"), - wait: z.object({ - id: z.string(), - }), - waitpoint: z.object({ - id: z.string(), - }), - }), - }, - WAITPOINT_COMPLETED: { + RESOLVE_WAITPOINT: { message: z.object({ version: z.literal("v1").default("v1"), waitpoint: CompletedWaitpoint, diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index efcfe8086e..2dd75e0ddd 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -109,6 +109,7 @@ const ExecutionSnapshot = z.object({ friendlyId: z.string(), executionStatus: z.enum(Object.values(TaskRunExecutionStatus) as [TaskRunExecutionStatus]), description: z.string(), + createdAt: z.coerce.date(), }); const BaseRunMetadata = z.object({ diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 302fcf5f37..40ab6116ad 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -275,20 +275,6 @@ export const TaskRunExecutionLazyAttemptPayload = z.object({ export type TaskRunExecutionLazyAttemptPayload = z.infer; -export const RuntimeWait = z.discriminatedUnion("type", [ - z.object({ - type: z.literal("DATETIME"), - id: z.string(), - date: z.coerce.date(), - }), - z.object({ - type: z.literal("MANUAL"), - id: z.string(), - }), -]); - -export type RuntimeWait = z.infer; - export const ManualCheckpointMetadata = z.object({ /** NOT a friendly ID */ attemptId: z.string(), diff --git a/packages/core/src/v3/serverOnly/httpServer.ts b/packages/core/src/v3/serverOnly/httpServer.ts index 3aa327a8bd..bdbfcb7589 100644 --- a/packages/core/src/v3/serverOnly/httpServer.ts +++ b/packages/core/src/v3/serverOnly/httpServer.ts @@ -201,7 +201,7 @@ export class HttpServer { ); if (error) { - logger.error("Route handler error", { error }); + logger.error("Route handler error", { error: this.formatError(error) }); return reply.empty(500); } @@ -210,7 +210,7 @@ export class HttpServer { return; } } catch (error) { - logger.error("Failed to handle request", { error }); + logger.error("Failed to handle request", { error: this.formatError(error) }); return reply.empty(500); } finally { this.collectMetrics(req, res, startTime); @@ -364,4 +364,16 @@ export class HttpServer { return null; } + + private formatError(error: unknown): string | Record { + if (error instanceof Error) { + return { + name: error.name, + message: error.message, + stack: error.stack, + }; + } + + return String(error); + } } diff --git a/packages/core/src/v3/utils/interval.ts b/packages/core/src/v3/utils/interval.ts index 9470ae2bb2..7ea7434718 100644 --- a/packages/core/src/v3/utils/interval.ts +++ b/packages/core/src/v3/utils/interval.ts @@ -40,13 +40,19 @@ export class IntervalService { } } - stop() { + stop(): { isExecuting: boolean } { + const returnValue = { + isExecuting: this._isExecuting, + }; + if (!this._isEnabled) { - return; + return returnValue; } this._isEnabled = false; this.#clearNextInterval(); + + return returnValue; } resetCurrentInterval() { diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index de89b5587b..5ff626f2ea 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -21,7 +21,7 @@ export { ProdUsageManager, type ProdUsageManagerOptions } from "../usage/prodUsa export { UsageTimeoutManager } from "../timeout/usageTimeoutManager.js"; export { StandardMetadataManager } from "../runMetadata/manager.js"; export { StandardWaitUntilManager } from "../waitUntil/manager.js"; -export { ManagedRuntimeManager } from "../runtime/managedRuntimeManager.js"; +export { SharedRuntimeManager } from "../runtime/sharedRuntimeManager.js"; export * from "../runEngineWorker/index.js"; export { StandardRunTimelineMetricsManager } from "../runTimelineMetrics/runTimelineMetricsManager.js"; export { WarmStartClient, type WarmStartClientOptions } from "../workers/warmStartClient.js"; diff --git a/scripts/publish-prerelease.sh b/scripts/publish-prerelease.sh index 763a80fbe0..66602fb069 100755 --- a/scripts/publish-prerelease.sh +++ b/scripts/publish-prerelease.sh @@ -55,7 +55,8 @@ if output=$(pnpm exec changeset version --snapshot $version 2>&1); then exit 0 fi else - echo "Error running changeset version command" + echo "$output" + echo "Error running changeset version command, detailed output above" exit 1 fi