Skip to content

Commit be20f92

Browse files
committed
only crash in case of readiness timeouts
1 parent 4ffacff commit be20f92

File tree

1 file changed

+28
-6
lines changed

1 file changed

+28
-6
lines changed

apps/coordinator/src/index.ts

+28-6
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ const chaosMonkey = new ChaosMonkey(
5555
!!process.env.CHAOS_MONKEY_DISABLE_DELAYS
5656
);
5757

58+
class CheckpointReadinessTimeoutError extends Error {}
59+
class CheckpointCancelError extends Error {}
60+
5861
class TaskCoordinator {
5962
#httpServer: ReturnType<typeof createServer>;
6063
#checkpointer = new Checkpointer({
@@ -398,9 +401,14 @@ class TaskCoordinator {
398401

399402
let timeout: NodeJS.Timeout | undefined = undefined;
400403

404+
const CHECKPOINTABLE_TIMEOUT_SECONDS = 20;
405+
401406
const isCheckpointable = new Promise((resolve, reject) => {
402407
// We set a reasonable timeout to prevent waiting forever
403-
timeout = setTimeout(() => reject("timeout"), 20_000);
408+
timeout = setTimeout(
409+
() => reject(new CheckpointReadinessTimeoutError()),
410+
CHECKPOINTABLE_TIMEOUT_SECONDS * 1000
411+
);
404412

405413
this.#checkpointableTasks.set(socket.data.runId, { resolve, reject });
406414
});
@@ -415,10 +423,24 @@ class TaskCoordinator {
415423
} catch (error) {
416424
logger.error("Error while waiting for checkpointable state", { error });
417425

418-
await crashRun({
419-
name: "ReadyForCheckpointError",
420-
message: `Failed to become checkpointable for ${reason}`,
421-
});
426+
if (error instanceof CheckpointReadinessTimeoutError) {
427+
await crashRun({
428+
name: error.name,
429+
message: `Failed to become checkpointable in ${CHECKPOINTABLE_TIMEOUT_SECONDS}s for ${reason}`,
430+
});
431+
432+
return {
433+
success: false,
434+
reason: "timeout",
435+
};
436+
}
437+
438+
if (error instanceof CheckpointCancelError) {
439+
return {
440+
success: false,
441+
reason: "canceled",
442+
};
443+
}
422444

423445
return {
424446
success: false,
@@ -1065,7 +1087,7 @@ class TaskCoordinator {
10651087

10661088
if (checkpointWait) {
10671089
// Stop waiting for task to reach checkpointable state
1068-
checkpointWait.reject("Checkpoint cancelled");
1090+
checkpointWait.reject(new CheckpointCancelError());
10691091
}
10701092

10711093
// Cancel checkpointing procedure

0 commit comments

Comments
 (0)