Skip to content

Commit 56d69f9

Browse files
committed
Move the task run heartbeats to RedisWorker
1 parent b946b9f commit 56d69f9

18 files changed

+467
-195
lines changed

apps/webapp/app/env.server.ts

+36
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,42 @@ const EnvironmentSchema = z.object({
368368
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
369369
BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"),
370370
BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"),
371+
372+
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
373+
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
374+
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
375+
376+
LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
377+
.string()
378+
.optional()
379+
.transform((v) => v ?? process.env.REDIS_HOST),
380+
LEGACY_RUN_ENGINE_WORKER_REDIS_READER_HOST: z
381+
.string()
382+
.optional()
383+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
384+
LEGACY_RUN_ENGINE_WORKER_REDIS_READER_PORT: z.coerce
385+
.number()
386+
.optional()
387+
.transform(
388+
(v) =>
389+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
390+
),
391+
LEGACY_RUN_ENGINE_WORKER_REDIS_PORT: z.coerce
392+
.number()
393+
.optional()
394+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
395+
LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME: z
396+
.string()
397+
.optional()
398+
.transform((v) => v ?? process.env.REDIS_USERNAME),
399+
LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD: z
400+
.string()
401+
.optional()
402+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
403+
LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED: z
404+
.string()
405+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
406+
LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
371407
});
372408

373409
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/services/worker.server.ts

+1-6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { $replica, prisma } from "~/db.server";
66
import { env } from "~/env.server";
77
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
88
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
9-
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
109
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
1110
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
1211
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
@@ -658,11 +657,7 @@ function getWorkerQueue() {
658657
"v3.requeueTaskRun": {
659658
priority: 0,
660659
maxAttempts: 3,
661-
handler: async (payload, job) => {
662-
const service = new RequeueTaskRunService();
663-
664-
await service.call(payload.runId);
665-
},
660+
handler: async (payload, job) => {}, // This is now handled by redisWorker
666661
},
667662
"v3.retryAttempt": {
668663
priority: 0,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import { Worker as RedisWorker } from "@internal/redis-worker";
2+
import { Logger } from "@trigger.dev/core/logger";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { logger } from "~/services/logger.server";
6+
import { singleton } from "~/utils/singleton";
7+
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
8+
import { tracer } from "./tracer.server";
9+
10+
const workerCatalog = {
11+
runHeartbeat: {
12+
schema: z.object({
13+
runId: z.string(),
14+
}),
15+
visibilityTimeoutMs: 10000,
16+
},
17+
};
18+
19+
function initializeWorker() {
20+
if (env.WORKER_ENABLED !== "true") {
21+
logger.debug("RedisWorker not initialized because WORKER_ENABLED is not set to true");
22+
return;
23+
}
24+
25+
if (!env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST || !env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT) {
26+
logger.debug(
27+
"RedisWorker not initialized because LEGACY_RUN_ENGINE_WORKER_REDIS_HOST or LEGACY_RUN_ENGINE_WORKER_REDIS_PORT is not set"
28+
);
29+
return;
30+
}
31+
32+
const redisOptions = {
33+
keyPrefix: "legacy-run-engine:worker:",
34+
host: env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST,
35+
port: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT,
36+
username: env.LEGACY_RUN_ENGINE_WORKER_REDIS_USERNAME,
37+
password: env.LEGACY_RUN_ENGINE_WORKER_REDIS_PASSWORD,
38+
enableAutoPipelining: true,
39+
...(env.LEGACY_RUN_ENGINE_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
40+
};
41+
42+
logger.debug(
43+
`👨‍🏭 Initializing legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}`
44+
);
45+
46+
const worker = new RedisWorker({
47+
name: "legacy-run-engine-worker",
48+
redisOptions,
49+
catalog: workerCatalog,
50+
concurrency: {
51+
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
52+
tasksPerWorker: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
53+
},
54+
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
55+
logger: new Logger("LegacyRunEngineWorker", "debug"),
56+
jobs: {
57+
runHeartbeat: async ({ payload }) => {
58+
const service = new TaskRunHeartbeatFailedService();
59+
60+
await service.call(payload.runId);
61+
},
62+
},
63+
});
64+
65+
return worker;
66+
}
67+
68+
export const legacyRunEngineWorker = singleton("legacyRunEngineWorker", initializeWorker);

apps/webapp/app/v3/marqs/index.server.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import {
3030
MessageQueueSubscriber,
3131
VisibilityTimeoutStrategy,
3232
} from "./types";
33-
import { V3VisibilityTimeout } from "./v3VisibilityTimeout.server";
33+
import { V3LegacyRunEngineWorkerVisibilityTimeout } from "./v3VisibilityTimeout.server";
3434

3535
const KEY_PREFIX = "marqs:";
3636

@@ -1611,7 +1611,7 @@ function getMarQSClient() {
16111611
name: "marqs",
16121612
tracer: trace.getTracer("marqs"),
16131613
keysProducer,
1614-
visibilityTimeoutStrategy: new V3VisibilityTimeout(),
1614+
visibilityTimeoutStrategy: new V3LegacyRunEngineWorkerVisibilityTimeout(),
16151615
queuePriorityStrategy: new FairDequeuingStrategy({
16161616
tracer: tracer,
16171617
redis,
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,28 @@
1-
import { RequeueTaskRunService } from "../requeueTaskRun.server";
1+
import { legacyRunEngineWorker } from "../legacyRunEngineWorker.server";
2+
import { TaskRunHeartbeatFailedService } from "../taskRunHeartbeatFailed.server";
23
import { VisibilityTimeoutStrategy } from "./types";
34

4-
export class V3VisibilityTimeout implements VisibilityTimeoutStrategy {
5+
export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy {
56
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
6-
await RequeueTaskRunService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
7+
await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
78
}
89

910
async cancelHeartbeat(messageId: string): Promise<void> {
10-
await RequeueTaskRunService.dequeue(messageId);
11+
await TaskRunHeartbeatFailedService.dequeue(messageId);
12+
}
13+
}
14+
15+
export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy {
16+
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
17+
await legacyRunEngineWorker?.enqueue({
18+
id: `heartbeat:${messageId}`,
19+
job: "runHeartbeat",
20+
payload: { runId: messageId },
21+
availableAt: new Date(Date.now() + timeoutInMs),
22+
});
23+
}
24+
25+
async cancelHeartbeat(messageId: string): Promise<void> {
26+
await legacyRunEngineWorker?.ack(`heartbeat:${messageId}`);
1127
}
1228
}

apps/webapp/app/v3/requeueTaskRun.server.ts renamed to apps/webapp/app/v3/taskRunHeartbeatFailed.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { workerQueue } from "~/services/worker.server";
99
import { socketIo } from "./handleSocketIo.server";
1010
import { TaskRunErrorCodes } from "@trigger.dev/core/v3";
1111

12-
export class RequeueTaskRunService extends BaseService {
12+
export class TaskRunHeartbeatFailedService extends BaseService {
1313
public async call(runId: string) {
1414
const taskRun = await this._prisma.taskRun.findFirst({
1515
where: {

apps/webapp/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
"@headlessui/react": "^1.7.8",
5252
"@heroicons/react": "^2.0.12",
5353
"@internal/zod-worker": "workspace:*",
54+
"@internal/redis-worker": "workspace:*",
5455
"@internationalized/date": "^3.5.1",
5556
"@lezer/highlight": "^1.1.6",
5657
"@opentelemetry/api": "1.9.0",

apps/webapp/tsconfig.json

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
"emails": ["../../internal-packages/emails/src/index"],
3535
"emails/*": ["../../internal-packages/emails/src/*"],
3636
"@internal/zod-worker": ["../../internal-packages/zod-worker/src/index"],
37-
"@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"]
37+
"@internal/zod-worker/*": ["../../internal-packages/zod-worker/src/*"],
38+
"@internal/redis-worker": ["../../internal-packages/redis-worker/src/index"],
39+
"@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"]
3840
},
3941
"noEmit": true
4042
}

internal-packages/redis-worker/package.json

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
"ioredis": "^5.3.2",
1212
"lodash.omit": "^4.5.0",
1313
"nanoid": "^5.0.7",
14-
"typescript": "^5.5.4",
1514
"zod": "3.23.8"
1615
},
1716
"devDependencies": {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export * from "./queue";
2+
export * from "./worker";

0 commit comments

Comments
 (0)