Skip to content

Commit 56a5b58

Browse files
authored
Add acking to RESUME_AFTER_DEPENDENCY message to the coordinator (#1313)
* Fix for run filtering not working with some special characters (double encoded) * Add the full dependentTaskAttempt to a ResumeBatchRunService log * Added RESUME_AFTER_DEPENDENCY_WITH_ACK * Set the delay to 5s * If a checkpoint has been created, the coordinator won’t continue the run with RESUME_AFTER_DEPENDENCY_WITH_ACK * If we’re keeping the run alive then set socket.data.requiresCheckpointResumeWithMessage to undefined * Log out the data before and after setting socket.data.requiresCheckpointResumeWithMessage
1 parent 74c2076 commit 56a5b58

File tree

5 files changed

+138
-25
lines changed

5 files changed

+138
-25
lines changed

apps/coordinator/src/index.ts

+69
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,49 @@ class TaskCoordinator {
162162

163163
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
164164
},
165+
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
166+
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
167+
168+
if (!taskSocket) {
169+
logger.log("Socket for attempt not found", {
170+
attemptFriendlyId: message.attemptFriendlyId,
171+
});
172+
return {
173+
success: false,
174+
error: {
175+
name: "SocketNotFoundError",
176+
message: "Socket for attempt not found",
177+
},
178+
};
179+
}
180+
181+
//if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue
182+
if (taskSocket.data.requiresCheckpointResumeWithMessage) {
183+
logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", {
184+
socketData: taskSocket.data,
185+
});
186+
187+
return {
188+
success: false,
189+
error: {
190+
name: "CheckpointMessagePresentError",
191+
message:
192+
"Checkpoint message is present, so we need to kill the process and resume from the queue.",
193+
},
194+
};
195+
}
196+
197+
await chaosMonkey.call();
198+
199+
// In case the task resumed faster than we could checkpoint
200+
this.#cancelCheckpoint(message.runId);
201+
202+
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
203+
204+
return {
205+
success: true,
206+
};
207+
},
165208
RESUME_AFTER_DURATION: async (message) => {
166209
const taskSocket = await this.#getAttemptSocket(message.attemptFriendlyId);
167210

@@ -792,6 +835,18 @@ class TaskCoordinator {
792835
return;
793836
}
794837

838+
logger.log("WAIT_FOR_TASK checkpoint created", {
839+
checkpoint,
840+
socketData: socket.data,
841+
});
842+
843+
//setting this means we can only resume from a checkpoint
844+
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
845+
logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", {
846+
checkpoint,
847+
socketData: socket.data,
848+
});
849+
795850
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
796851
version: "v1",
797852
attemptFriendlyId: message.attemptFriendlyId,
@@ -804,6 +859,7 @@ class TaskCoordinator {
804859
});
805860

806861
if (ack?.keepRunAlive) {
862+
socket.data.requiresCheckpointResumeWithMessage = undefined;
807863
logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId });
808864
return;
809865
}
@@ -862,6 +918,18 @@ class TaskCoordinator {
862918
return;
863919
}
864920

921+
logger.log("WAIT_FOR_BATCH checkpoint created", {
922+
checkpoint,
923+
socketData: socket.data,
924+
});
925+
926+
//setting this means we can only resume from a checkpoint
927+
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
928+
logger.log("WAIT_FOR_BATCH set checkpoint", {
929+
checkpoint,
930+
socketData: socket.data,
931+
});
932+
865933
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
866934
version: "v1",
867935
attemptFriendlyId: message.attemptFriendlyId,
@@ -875,6 +943,7 @@ class TaskCoordinator {
875943
});
876944

877945
if (ack?.keepRunAlive) {
946+
socket.data.requiresCheckpointResumeWithMessage = undefined;
878947
logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId });
879948
return;
880949
}

apps/webapp/app/hooks/useSearchParam.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ export function useSearchParams() {
1818
}
1919

2020
if (typeof value === "string") {
21-
search.set(param, encodeURIComponent(value));
21+
search.set(param, value);
2222
continue;
2323
}
2424

2525
search.delete(param);
2626
for (const v of value) {
27-
search.append(param, encodeURIComponent(v));
27+
search.append(param, v);
2828
}
2929
}
3030
},

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

+35-8
Original file line numberDiff line numberDiff line change
@@ -725,20 +725,47 @@ export class SharedQueueConsumer {
725725
}
726726

727727
try {
728-
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY", {
729-
runId: resumableAttempt.taskRunId,
730-
attemptId: resumableAttempt.id,
731-
});
732-
733-
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
734-
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
735-
version: "v1",
728+
const resumeMessage = {
729+
version: "v1" as const,
736730
runId: resumableAttempt.taskRunId,
737731
attemptId: resumableAttempt.id,
738732
attemptFriendlyId: resumableAttempt.friendlyId,
739733
completions,
740734
executions,
735+
};
736+
737+
logger.debug("Broadcasting RESUME_AFTER_DEPENDENCY_WITH_ACK", { resumeMessage, message });
738+
739+
// The attempt should still be running so we can broadcast to all coordinators to resume immediately
740+
const responses = await socketIo.coordinatorNamespace
741+
.timeout(10_000)
742+
.emitWithAck("RESUME_AFTER_DEPENDENCY_WITH_ACK", resumeMessage);
743+
744+
logger.debug("RESUME_AFTER_DEPENDENCY_WITH_ACK received", {
745+
resumeMessage,
746+
responses,
747+
message,
741748
});
749+
750+
if (responses.length === 0) {
751+
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK no response", {
752+
resumeMessage,
753+
message,
754+
});
755+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
756+
return;
757+
}
758+
759+
const failed = responses.filter((response) => !response.success);
760+
if (failed.length > 0) {
761+
logger.error("RESUME_AFTER_DEPENDENCY_WITH_ACK failed", {
762+
resumeMessage,
763+
failed,
764+
message,
765+
});
766+
await this.#nackAndDoMoreWork(message.messageId, this._options.nextTickInterval, 5_000);
767+
return;
768+
}
742769
} catch (e) {
743770
if (e instanceof Error) {
744771
this._currentSpan?.recordException(e);

apps/webapp/app/v3/services/resumeBatchRun.server.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,9 @@ export class ResumeBatchRunService extends BaseService {
132132
if (wasUpdated) {
133133
logger.debug("ResumeBatchRunService: Resuming dependent run without checkpoint", {
134134
batchRunId: batchRun.id,
135-
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
135+
dependentTaskAttempt: batchRun.dependentTaskAttempt,
136+
checkpointEventId: batchRun.checkpointEventId,
137+
hasCheckpointEvent: !!batchRun.checkpointEventId,
136138
});
137139
await marqs?.replaceMessage(dependentRun.id, {
138140
type: "RESUME",

packages/core/src/v3/schemas/messages.ts

+29-14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,21 @@ import {
1515
WaitReason,
1616
} from "./schemas.js";
1717

18+
const ackCallbackResult = z.discriminatedUnion("success", [
19+
z.object({
20+
success: z.literal(false),
21+
error: z.object({
22+
name: z.string(),
23+
message: z.string(),
24+
stack: z.string().optional(),
25+
stderr: z.string().optional(),
26+
}),
27+
}),
28+
z.object({
29+
success: z.literal(true),
30+
}),
31+
]);
32+
1833
export const BackgroundWorkerServerMessages = z.discriminatedUnion("type", [
1934
z.object({
2035
type: z.literal("CANCEL_ATTEMPT"),
@@ -269,20 +284,7 @@ export const PlatformToProviderMessages = {
269284
projectId: z.string(),
270285
deploymentId: z.string(),
271286
}),
272-
callback: z.discriminatedUnion("success", [
273-
z.object({
274-
success: z.literal(false),
275-
error: z.object({
276-
name: z.string(),
277-
message: z.string(),
278-
stack: z.string().optional(),
279-
stderr: z.string().optional(),
280-
}),
281-
}),
282-
z.object({
283-
success: z.literal(true),
284-
}),
285-
]),
287+
callback: ackCallbackResult,
286288
},
287289
RESTORE: {
288290
message: z.object({
@@ -504,6 +506,7 @@ export const CoordinatorToPlatformMessages = {
504506
};
505507

506508
export const PlatformToCoordinatorMessages = {
509+
/** @deprecated use RESUME_AFTER_DEPENDENCY_WITH_ACK instead */
507510
RESUME_AFTER_DEPENDENCY: {
508511
message: z.object({
509512
version: z.literal("v1").default("v1"),
@@ -514,6 +517,17 @@ export const PlatformToCoordinatorMessages = {
514517
executions: TaskRunExecution.array(),
515518
}),
516519
},
520+
RESUME_AFTER_DEPENDENCY_WITH_ACK: {
521+
message: z.object({
522+
version: z.literal("v1").default("v1"),
523+
runId: z.string(),
524+
attemptId: z.string(),
525+
attemptFriendlyId: z.string(),
526+
completions: TaskRunExecutionResult.array(),
527+
executions: TaskRunExecution.array(),
528+
}),
529+
callback: ackCallbackResult,
530+
},
517531
RESUME_AFTER_DURATION: {
518532
message: z.object({
519533
version: z.literal("v1").default("v1"),
@@ -847,6 +861,7 @@ export const ProdWorkerSocketData = z.object({
847861
podName: z.string(),
848862
deploymentId: z.string(),
849863
deploymentVersion: z.string(),
864+
requiresCheckpointResumeWithMessage: z.string().optional(),
850865
});
851866

852867
export const CoordinatorSocketData = z.object({

0 commit comments

Comments
 (0)