Skip to content

Fix controller waitpoint resolution, suspendable state, and snapshot race conditions #2006

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 69 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
b384dad
remove dead code
nicktrn Apr 29, 2025
082926a
rename managed to shared runtime manager
nicktrn Apr 29, 2025
786416c
rename to resolve waitpoint for clarity
nicktrn Apr 29, 2025
b5ae558
add resolver id helper
nicktrn Apr 29, 2025
4d46d20
store and correctly resolve waipoints that come in early
nicktrn Apr 29, 2025
e72bcc8
fix ipc message type change
nicktrn Apr 29, 2025
38273ae
branded type for resolver ids
nicktrn Apr 29, 2025
f190420
add fixme comments
nicktrn Apr 29, 2025
3592db2
remove more unused ipc schemas
nicktrn Apr 29, 2025
d6dfc66
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn Apr 29, 2025
b13d8b7
fix entitlement validation when client doesn't exist
nicktrn Apr 29, 2025
3cb44a1
restore hello world reference workspace imports
nicktrn Apr 29, 2025
9e2729c
runtime manager debug logs
nicktrn Apr 29, 2025
752770f
prefix engine run logs
nicktrn Apr 30, 2025
5f52292
managed run logger accepts nested props
nicktrn Apr 30, 2025
bb9ac50
runtime suspendable state and improved logs
nicktrn Apr 30, 2025
3d978d3
require suspendable state for checkpoints, fix snapshot processing queue
nicktrn Apr 30, 2025
63b3bc0
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn Apr 30, 2025
479c304
add terminal link as cli module so we can more easily patch it
nicktrn Apr 30, 2025
a053114
apply cursor patch
nicktrn Apr 30, 2025
c3333ab
add license info
nicktrn Apr 30, 2025
8bc2e33
remove terminal-link package and add deprecation notice
nicktrn Apr 30, 2025
784e151
remove old patch
nicktrn Apr 30, 2025
641ca67
remove terminal-link from sdk
nicktrn Apr 30, 2025
ccd9e5b
rename snapshot module
nicktrn Apr 30, 2025
f62047b
add cli test tsconfig
nicktrn Apr 30, 2025
c3dbb8a
add run logger base type
nicktrn Apr 30, 2025
140e2d7
add snapshot manager tests
nicktrn Apr 30, 2025
8df5122
Merge branch 'fix/terminal-links' into fix/resolve-waitpoints
nicktrn Apr 30, 2025
ddb40ae
fix cli builds
nicktrn May 1, 2025
0ed24db
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn May 1, 2025
54db582
improve QUEUED_EXECUTING test
nicktrn May 1, 2025
b17a947
changeset
nicktrn May 1, 2025
9292667
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn May 1, 2025
68ea4c4
make testcontainers wait until container has stopped
nicktrn May 1, 2025
c36e274
require unit tests for publishing again
nicktrn May 1, 2025
87b0ce1
avoid mutation during iteration when resolving pending waitpoints
nicktrn May 1, 2025
2622b0d
improve debug logs and make them less noisy
nicktrn May 1, 2025
ffa2a73
always update poller snapshot id for accurate logs
nicktrn May 1, 2025
7a37a26
detach task run process handlers
nicktrn May 1, 2025
55b835d
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn May 1, 2025
8fd09e3
check for env overrides in a few more places and add verbose logs
nicktrn May 1, 2025
da24a79
log when poller is still executing when we stop it
nicktrn May 1, 2025
c62cf10
add supervisor to publish workflow
nicktrn May 1, 2025
467f9de
always print full deploy logs in CI
nicktrn May 2, 2025
246c1a9
Revert "avoid mutation during iteration when resolving pending waitpo…
nicktrn May 2, 2025
9403409
disable pre
nicktrn May 2, 2025
4607b05
print prerelease script errors
nicktrn May 2, 2025
ed1a44c
Revert "disable pre"
nicktrn May 2, 2025
213a983
misc fixes
nicktrn May 2, 2025
344e2e5
better debug logs
nicktrn May 2, 2025
cc0feb0
Merge remote-tracking branch 'origin/main' into fix/resolve-waitpoints
nicktrn May 6, 2025
b4c61af
add snapshots since methods and route
nicktrn May 6, 2025
a7e4ddd
prep for snapshots since
nicktrn May 6, 2025
52a57a4
improve deprecated execution detection
nicktrn May 6, 2025
16b344d
update supervisor and schema
nicktrn May 6, 2025
b09178a
properly log http server errors
nicktrn May 6, 2025
59d9784
detect restore after failed snapshot fetch
nicktrn May 6, 2025
e26dcb2
run and snapshot id can be overridden
nicktrn May 6, 2025
7c1a816
fix restore detection
nicktrn May 6, 2025
6b16df0
fix deprecation checks, move into snapshot manager
nicktrn May 6, 2025
2c8bcc2
less logs
nicktrn May 6, 2025
1ce6ddc
rename snapshot manager stop
nicktrn May 6, 2025
188a023
restore detection was moved into snapshot manager
nicktrn May 6, 2025
4a27eea
fix notifier logs
nicktrn May 7, 2025
8c3da69
make runtime manager status a debug log
nicktrn May 7, 2025
bd8efa7
no need to attach runtime status twice
nicktrn May 7, 2025
997185c
findUnique -> findFirst
nicktrn May 7, 2025
6769aeb
sort snapshots by created at everywhere
nicktrn May 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/plenty-dolphins-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

- Correctly resolve waitpoints that come in early
- Ensure correct state before requesting suspension
- Fix race conditions in snapshot processing
5 changes: 5 additions & 0 deletions .changeset/sweet-dolphins-invent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Always print full deploy logs in CI
26 changes: 26 additions & 0 deletions apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
WorkloadRunAttemptStartRequestBody,
type WorkloadRunAttemptStartResponseBody,
type WorkloadRunLatestSnapshotResponseBody,
WorkloadRunSnapshotsSinceResponseBody,
type WorkloadServerToClientEvents,
type WorkloadSuspendRunResponseBody,
} from "@trigger.dev/core/v3/workers";
Expand Down Expand Up @@ -341,6 +342,31 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
} satisfies WorkloadRunLatestSnapshotResponseBody);
},
})
.route(
"/api/v1/workload-actions/runs/:runFriendlyId/snapshots/since/:snapshotFriendlyId",
"GET",
{
paramsSchema: WorkloadActionParams,
handler: async ({ req, reply, params }) => {
const sinceSnapshotResponse = await this.workerClient.getSnapshotsSince(
params.runFriendlyId,
params.snapshotFriendlyId,
this.runnerIdFromRequest(req)
);

if (!sinceSnapshotResponse.success) {
console.error("Failed to get snapshots since", {
runId: params.runFriendlyId,
error: sinceSnapshotResponse.error,
});
reply.empty(500);
return;
}

reply.json(sinceSnapshotResponse.data satisfies WorkloadRunSnapshotsSinceResponseBody);
},
}
)
.route("/api/v1/workload-actions/runs/:runFriendlyId/logs/debug", "POST", {
paramsSchema: WorkloadActionParams.pick({ runFriendlyId: true }),
bodySchema: WorkloadDebugLogRequestBody,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { json, TypedResponse } from "@remix-run/server-runtime";
import { WorkerApiRunSnapshotsSinceResponseBody } from "@trigger.dev/core/v3/workers";
import { z } from "zod";
import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";

export const loader = createLoaderWorkerApiRoute(
{
params: z.object({
runFriendlyId: z.string(),
snapshotId: z.string(),
}),
},
async ({
authenticatedWorker,
params,
}): Promise<TypedResponse<WorkerApiRunSnapshotsSinceResponseBody>> => {
const { runFriendlyId, snapshotId } = params;

const snapshots = await authenticatedWorker.getSnapshotsSince({
runFriendlyId,
snapshotId,
});

if (!snapshots) {
throw new Error("Failed to retrieve snapshots since given snapshot");
}

return json({ snapshots });
}
);
6 changes: 4 additions & 2 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ export function registerRunEngineEventBusHandlers() {
engine.eventBus.on("executionSnapshotCreated", async ({ time, run, snapshot }) => {
const eventResult = await recordRunDebugLog(
run.id,
`${snapshot.executionStatus} - ${snapshot.description}`,
`[engine] ${snapshot.executionStatus} - ${snapshot.description}`,
{
attributes: {
properties: {
Expand Down Expand Up @@ -450,6 +450,7 @@ export function registerRunEngineEventBusHandlers() {
// Record notification event
const eventResult = await recordRunDebugLog(
run.id,
// don't prefix this with [engine] - "run:notify" is the correct prefix
`run:notify platform -> supervisor: ${snapshot.executionStatus}`,
{
attributes: {
Expand Down Expand Up @@ -479,6 +480,7 @@ export function registerRunEngineEventBusHandlers() {
// Record notification event
const eventResult = await recordRunDebugLog(
run.id,
// don't prefix this with [engine] - "run:notify" is the correct prefix
`run:notify ERROR platform -> supervisor: ${snapshot.executionStatus}`,
{
attributes: {
Expand All @@ -505,7 +507,7 @@ export function registerRunEngineEventBusHandlers() {
engine.eventBus.on("incomingCheckpointDiscarded", async ({ time, run, snapshot, checkpoint }) => {
const eventResult = await recordRunDebugLog(
run.id,
`Checkpoint discarded: ${checkpoint.discardReason}`,
`[engine] Checkpoint discarded: ${checkpoint.discardReason}`,
{
attributes: {
properties: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,19 @@ export class AuthenticatedWorkerInstance extends WithRunEngine {
});
}

async getSnapshotsSince({
runFriendlyId,
snapshotId,
}: {
runFriendlyId: string;
snapshotId: string;
}) {
return await this._engine.getSnapshotsSince({
runId: fromFriendlyId(runFriendlyId),
snapshotId: fromFriendlyId(snapshotId),
});
}

toJSON(): WorkerGroupTokenAuthenticationResponse {
if (this.type === WorkerInstanceGroupType.MANAGED) {
return {
Expand Down
61 changes: 24 additions & 37 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import { EnqueueSystem } from "./systems/enqueueSystem.js";
import {
ExecutionSnapshotSystem,
getLatestExecutionSnapshot,
getExecutionSnapshotsSince,
executionDataFromSnapshot,
} from "./systems/executionSnapshotSystem.js";
import { PendingVersionSystem } from "./systems/pendingVersionSystem.js";
import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js";
Expand Down Expand Up @@ -1100,43 +1102,31 @@ export class RunEngine {
const prisma = tx ?? this.prisma;
try {
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
return executionDataFromSnapshot(snapshot);
} catch (e) {
this.logger.error("Failed to getRunExecutionData", {
message: e instanceof Error ? e.message : e,
});
return null;
}
}

const executionData: RunExecutionData = {
version: "1" as const,
snapshot: {
id: snapshot.id,
friendlyId: snapshot.friendlyId,
executionStatus: snapshot.executionStatus,
description: snapshot.description,
},
run: {
id: snapshot.runId,
friendlyId: snapshot.runFriendlyId,
status: snapshot.runStatus,
attemptNumber: snapshot.attemptNumber ?? undefined,
},
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
checkpoint: snapshot.checkpoint
? {
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
: undefined,
completedWaitpoints: snapshot.completedWaitpoints,
};
async getSnapshotsSince({
runId,
snapshotId,
tx,
}: {
runId: string;
snapshotId: string;
tx?: PrismaClientOrTransaction;
}): Promise<RunExecutionData[] | null> {
const prisma = tx ?? this.prisma;

return executionData;
try {
const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId);
return snapshots.map(executionDataFromSnapshot);
} catch (e) {
this.logger.error("Failed to getRunExecutionData", {
this.logger.error("Failed to getSnapshotsSince", {
message: e instanceof Error ? e.message : e,
});
return null;
Expand All @@ -1158,9 +1148,6 @@ export class RunEngine {
}
}

//#endregion

//#region Heartbeat
async #handleStalledSnapshot({
runId,
snapshotId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CompletedWaitpoint, ExecutionResult } from "@trigger.dev/core/v3";
import { CompletedWaitpoint, ExecutionResult, RunExecutionData } from "@trigger.dev/core/v3";
import { BatchId, RunId, SnapshotId } from "@trigger.dev/core/v3/isomorphic";
import {
Prisma,
Expand All @@ -17,31 +17,23 @@ export type ExecutionSnapshotSystemOptions = {
heartbeatTimeouts: HeartbeatTimeouts;
};

export interface LatestExecutionSnapshot extends TaskRunExecutionSnapshot {
export interface EnhancedExecutionSnapshot extends TaskRunExecutionSnapshot {
friendlyId: string;
runFriendlyId: string;
checkpoint: TaskRunCheckpoint | null;
completedWaitpoints: CompletedWaitpoint[];
}

/* Gets the most recent valid snapshot for a run */
export async function getLatestExecutionSnapshot(
prisma: PrismaClientOrTransaction,
runId: string
): Promise<LatestExecutionSnapshot> {
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { runId, isValid: true },
include: {
completedWaitpoints: true,
checkpoint: true,
},
orderBy: { createdAt: "desc" },
});

if (!snapshot) {
throw new Error(`No execution snapshot found for TaskRun ${runId}`);
}
type ExecutionSnapshotWithCheckAndWaitpoints = Prisma.TaskRunExecutionSnapshotGetPayload<{
include: {
checkpoint: true;
completedWaitpoints: true;
};
}>;

function enhanceExecutionSnapshot(
snapshot: ExecutionSnapshotWithCheckAndWaitpoints
): EnhancedExecutionSnapshot {
return {
...snapshot,
friendlyId: SnapshotId.toFriendlyId(snapshot.id),
Expand Down Expand Up @@ -99,6 +91,27 @@ export async function getLatestExecutionSnapshot(
};
}

/* Gets the most recent valid snapshot for a run */
export async function getLatestExecutionSnapshot(
prisma: PrismaClientOrTransaction,
runId: string
): Promise<EnhancedExecutionSnapshot> {
const snapshot = await prisma.taskRunExecutionSnapshot.findFirst({
where: { runId, isValid: true },
include: {
completedWaitpoints: true,
checkpoint: true,
},
orderBy: { createdAt: "desc" },
});

if (!snapshot) {
throw new Error(`No execution snapshot found for TaskRun ${runId}`);
}

return enhanceExecutionSnapshot(snapshot);
}

export async function getExecutionSnapshotCompletedWaitpoints(
prisma: PrismaClientOrTransaction,
snapshotId: string
Expand Down Expand Up @@ -141,6 +154,72 @@ export function executionResultFromSnapshot(snapshot: TaskRunExecutionSnapshot):
};
}

export function executionDataFromSnapshot(snapshot: EnhancedExecutionSnapshot): RunExecutionData {
return {
version: "1" as const,
snapshot: {
id: snapshot.id,
friendlyId: snapshot.friendlyId,
executionStatus: snapshot.executionStatus,
description: snapshot.description,
},
run: {
id: snapshot.runId,
friendlyId: snapshot.runFriendlyId,
status: snapshot.runStatus,
attemptNumber: snapshot.attemptNumber ?? undefined,
},
batch: snapshot.batchId
? {
id: snapshot.batchId,
friendlyId: BatchId.toFriendlyId(snapshot.batchId),
}
: undefined,
checkpoint: snapshot.checkpoint
? {
id: snapshot.checkpoint.id,
friendlyId: snapshot.checkpoint.friendlyId,
type: snapshot.checkpoint.type,
location: snapshot.checkpoint.location,
imageRef: snapshot.checkpoint.imageRef,
reason: snapshot.checkpoint.reason ?? undefined,
}
: undefined,
completedWaitpoints: snapshot.completedWaitpoints,
};
}

export async function getExecutionSnapshotsSince(
prisma: PrismaClientOrTransaction,
runId: string,
sinceSnapshotId: string
): Promise<EnhancedExecutionSnapshot[]> {
// Find the createdAt of the sinceSnapshotId
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findUnique({
where: { id: sinceSnapshotId },
select: { createdAt: true },
});

if (!sinceSnapshot) {
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
}

const snapshots = await prisma.taskRunExecutionSnapshot.findMany({
where: {
runId,
isValid: true,
createdAt: { gt: sinceSnapshot.createdAt },
},
include: {
completedWaitpoints: true,
checkpoint: true,
},
orderBy: { createdAt: "asc" },
});

return snapshots.map(enhanceExecutionSnapshot);
}

export class ExecutionSnapshotSystem {
private readonly $: SystemResources;
private readonly heartbeatTimeouts: HeartbeatTimeouts;
Expand Down
Loading