Skip to content

Commit ac71200

Browse files
committed
Move alerts to redis worker, improving redis worker
1 parent 56d69f9 commit ac71200

22 files changed

+453
-408
lines changed

apps/webapp/app/env.server.ts

+38
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,8 @@ const EnvironmentSchema = z.object({
372372
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
373373
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),
374374
LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
375+
LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
376+
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
375377

376378
LEGACY_RUN_ENGINE_WORKER_REDIS_HOST: z
377379
.string()
@@ -404,6 +406,42 @@ const EnvironmentSchema = z.object({
404406
.string()
405407
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
406408
LEGACY_RUN_ENGINE_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
409+
410+
COMMON_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
411+
COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
412+
COMMON_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
413+
COMMON_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
414+
COMMON_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(100),
415+
416+
COMMON_WORKER_REDIS_HOST: z
417+
.string()
418+
.optional()
419+
.transform((v) => v ?? process.env.REDIS_HOST),
420+
COMMON_WORKER_REDIS_READER_HOST: z
421+
.string()
422+
.optional()
423+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
424+
COMMON_WORKER_REDIS_READER_PORT: z.coerce
425+
.number()
426+
.optional()
427+
.transform(
428+
(v) =>
429+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
430+
),
431+
COMMON_WORKER_REDIS_PORT: z.coerce
432+
.number()
433+
.optional()
434+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
435+
COMMON_WORKER_REDIS_USERNAME: z
436+
.string()
437+
.optional()
438+
.transform((v) => v ?? process.env.REDIS_USERNAME),
439+
COMMON_WORKER_REDIS_PASSWORD: z
440+
.string()
441+
.optional()
442+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
443+
COMMON_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
444+
COMMON_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
407445
});
408446

409447
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/services/worker.server.ts

-13
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
88
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
99
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
1010
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
11-
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
1211
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
1312
import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server";
1413
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
@@ -156,9 +155,6 @@ const workerCatalog = {
156155
"v3.performTaskRunAlerts": z.object({
157156
runId: z.string(),
158157
}),
159-
"v3.performTaskAttemptAlerts": z.object({
160-
attemptId: z.string(),
161-
}),
162158
"v3.deliverAlert": z.object({
163159
alertId: z.string(),
164160
}),
@@ -609,15 +605,6 @@ function getWorkerQueue() {
609605
return await service.call(payload.runId);
610606
},
611607
},
612-
"v3.performTaskAttemptAlerts": {
613-
priority: 0,
614-
maxAttempts: 3,
615-
handler: async (payload, job) => {
616-
const service = new PerformTaskAttemptAlertsService();
617-
618-
return await service.call(payload.attemptId);
619-
},
620-
},
621608
"v3.deliverAlert": {
622609
priority: 0,
623610
maxAttempts: 8,
+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { Worker as RedisWorker } from "@internal/redis-worker";
2+
import { Logger } from "@trigger.dev/core/logger";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { logger } from "~/services/logger.server";
6+
import { singleton } from "~/utils/singleton";
7+
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
8+
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
9+
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";
10+
11+
function initializeWorker() {
12+
const redisOptions = {
13+
keyPrefix: "common:worker:",
14+
host: env.COMMON_WORKER_REDIS_HOST,
15+
port: env.COMMON_WORKER_REDIS_PORT,
16+
username: env.COMMON_WORKER_REDIS_USERNAME,
17+
password: env.COMMON_WORKER_REDIS_PASSWORD,
18+
enableAutoPipelining: true,
19+
...(env.COMMON_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
20+
};
21+
22+
logger.debug(`👨‍🏭 Initializing common worker at host ${env.COMMON_WORKER_REDIS_HOST}`);
23+
24+
const worker = new RedisWorker({
25+
name: "common-worker",
26+
redisOptions,
27+
catalog: {
28+
"v3.performTaskRunAlerts": {
29+
schema: z.object({
30+
runId: z.string(),
31+
}),
32+
visibilityTimeoutMs: 60_000,
33+
retry: {
34+
maxAttempts: 3,
35+
},
36+
},
37+
"v3.performDeploymentAlerts": {
38+
schema: z.object({
39+
deploymentId: z.string(),
40+
}),
41+
visibilityTimeoutMs: 60_000,
42+
retry: {
43+
maxAttempts: 3,
44+
},
45+
},
46+
"v3.deliverAlert": {
47+
schema: z.object({
48+
alertId: z.string(),
49+
}),
50+
visibilityTimeoutMs: 60_000,
51+
retry: {
52+
maxAttempts: 3,
53+
},
54+
},
55+
},
56+
concurrency: {
57+
workers: env.COMMON_WORKER_CONCURRENCY_WORKERS,
58+
tasksPerWorker: env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER,
59+
limit: env.COMMON_WORKER_CONCURRENCY_LIMIT,
60+
},
61+
pollIntervalMs: env.COMMON_WORKER_POLL_INTERVAL,
62+
immediatePollIntervalMs: env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL,
63+
logger: new Logger("CommonWorker", "debug"),
64+
jobs: {
65+
"v3.deliverAlert": async ({ payload }) => {
66+
const service = new DeliverAlertService();
67+
68+
return await service.call(payload.alertId);
69+
},
70+
"v3.performDeploymentAlerts": async ({ payload }) => {
71+
const service = new PerformDeploymentAlertsService();
72+
73+
return await service.call(payload.deploymentId);
74+
},
75+
"v3.performTaskRunAlerts": async ({ payload }) => {
76+
const service = new PerformTaskRunAlertsService();
77+
return await service.call(payload.runId);
78+
},
79+
},
80+
});
81+
82+
if (env.WORKER_ENABLED === "true") {
83+
logger.debug(
84+
`👨‍🏭 Starting common worker at host ${env.COMMON_WORKER_REDIS_HOST}, pollInterval = ${env.COMMON_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.COMMON_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.COMMON_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.COMMON_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.COMMON_WORKER_CONCURRENCY_LIMIT}`
85+
);
86+
87+
worker.start();
88+
}
89+
90+
return worker;
91+
}
92+
93+
export const commonWorker = singleton("commonWorker", initializeWorker);

apps/webapp/app/v3/legacyRunEngineWorker.server.ts

+21-23
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,8 @@ import { env } from "~/env.server";
55
import { logger } from "~/services/logger.server";
66
import { singleton } from "~/utils/singleton";
77
import { TaskRunHeartbeatFailedService } from "./taskRunHeartbeatFailed.server";
8-
import { tracer } from "./tracer.server";
9-
10-
const workerCatalog = {
11-
runHeartbeat: {
12-
schema: z.object({
13-
runId: z.string(),
14-
}),
15-
visibilityTimeoutMs: 10000,
16-
},
17-
};
188

199
function initializeWorker() {
20-
if (env.WORKER_ENABLED !== "true") {
21-
logger.debug("RedisWorker not initialized because WORKER_ENABLED is not set to true");
22-
return;
23-
}
24-
25-
if (!env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST || !env.LEGACY_RUN_ENGINE_WORKER_REDIS_PORT) {
26-
logger.debug(
27-
"RedisWorker not initialized because LEGACY_RUN_ENGINE_WORKER_REDIS_HOST or LEGACY_RUN_ENGINE_WORKER_REDIS_PORT is not set"
28-
);
29-
return;
30-
}
31-
3210
const redisOptions = {
3311
keyPrefix: "legacy-run-engine:worker:",
3412
host: env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST,
@@ -46,12 +24,24 @@ function initializeWorker() {
4624
const worker = new RedisWorker({
4725
name: "legacy-run-engine-worker",
4826
redisOptions,
49-
catalog: workerCatalog,
27+
catalog: {
28+
runHeartbeat: {
29+
schema: z.object({
30+
runId: z.string(),
31+
}),
32+
visibilityTimeoutMs: 60_000,
33+
retry: {
34+
maxAttempts: 3,
35+
},
36+
},
37+
},
5038
concurrency: {
5139
workers: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS,
5240
tasksPerWorker: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER,
41+
limit: env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT,
5342
},
5443
pollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL,
44+
immediatePollIntervalMs: env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL,
5545
logger: new Logger("LegacyRunEngineWorker", "debug"),
5646
jobs: {
5747
runHeartbeat: async ({ payload }) => {
@@ -62,6 +52,14 @@ function initializeWorker() {
6252
},
6353
});
6454

55+
if (env.WORKER_ENABLED === "true") {
56+
logger.debug(
57+
`👨‍🏭 Starting legacy run engine worker at host ${env.LEGACY_RUN_ENGINE_WORKER_REDIS_HOST}, pollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_POLL_INTERVAL}, immediatePollInterval = ${env.LEGACY_RUN_ENGINE_WORKER_IMMEDIATE_POLL_INTERVAL}, workers = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS}, tasksPerWorker = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER}, concurrencyLimit = ${env.LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_LIMIT}`
58+
);
59+
60+
worker.start();
61+
}
62+
6563
return worker;
6664
}
6765

apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export class V3GraphileVisibilityTimeout implements VisibilityTimeoutStrategy {
1414

1515
export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeoutStrategy {
1616
async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
17-
await legacyRunEngineWorker?.enqueue({
17+
await legacyRunEngineWorker.enqueue({
1818
id: `heartbeat:${messageId}`,
1919
job: "runHeartbeat",
2020
payload: { runId: messageId },
@@ -23,6 +23,6 @@ export class V3LegacyRunEngineWorkerVisibilityTimeout implements VisibilityTimeo
2323
}
2424

2525
async cancelHeartbeat(messageId: string): Promise<void> {
26-
await legacyRunEngineWorker?.ack(`heartbeat:${messageId}`);
26+
await legacyRunEngineWorker.ack(`heartbeat:${messageId}`);
2727
}
2828
}

apps/webapp/app/v3/services/alerts/deliverAlert.server.ts

+8-16
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { decryptSecret } from "~/services/secrets/secretStore.server";
2828
import { workerQueue } from "~/services/worker.server";
2929
import { BaseService } from "../baseService.server";
3030
import { FINAL_ATTEMPT_STATUSES } from "~/v3/taskStatus";
31+
import { commonWorker } from "~/v3/commonWorker.server";
3132

3233
type FoundAlert = Prisma.Result<
3334
typeof prisma.projectAlert,
@@ -1092,22 +1093,13 @@ export class DeliverAlertService extends BaseService {
10921093
return text;
10931094
}
10941095

1095-
static async enqueue(
1096-
alertId: string,
1097-
tx: PrismaClientOrTransaction,
1098-
options?: { runAt?: Date; queueName?: string }
1099-
) {
1100-
return await workerQueue.enqueue(
1101-
"v3.deliverAlert",
1102-
{
1103-
alertId,
1104-
},
1105-
{
1106-
tx,
1107-
runAt: options?.runAt,
1108-
jobKey: `deliverAlert:${alertId}`,
1109-
}
1110-
);
1096+
static async enqueue(alertId: string, runAt?: Date) {
1097+
return await commonWorker.enqueue({
1098+
id: `alert:${alertId}`,
1099+
job: "v3.deliverAlert",
1100+
payload: { alertId },
1101+
availableAt: runAt,
1102+
});
11111103
}
11121104
}
11131105

apps/webapp/app/v3/services/alerts/performDeploymentAlerts.server.ts

+20-26
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { workerQueue } from "~/services/worker.server";
44
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
55
import { BaseService } from "../baseService.server";
66
import { DeliverAlertService } from "./deliverAlert.server";
7+
import { commonWorker } from "~/v3/commonWorker.server";
78

89
export class PerformDeploymentAlertsService extends BaseService {
910
public async call(deploymentId: string) {
@@ -45,34 +46,27 @@ export class PerformDeploymentAlertsService extends BaseService {
4546
deployment: WorkerDeployment,
4647
alertType: ProjectAlertType
4748
) {
48-
await $transaction(this._prisma, "create and send deploy alert", async (tx) => {
49-
const alert = await this._prisma.projectAlert.create({
50-
data: {
51-
friendlyId: generateFriendlyId("alert"),
52-
channelId: alertChannel.id,
53-
projectId: deployment.projectId,
54-
environmentId: deployment.environmentId,
55-
status: "PENDING",
56-
type: alertType,
57-
workerDeploymentId: deployment.id,
58-
},
59-
});
60-
61-
await DeliverAlertService.enqueue(alert.id, tx);
49+
const alert = await this._prisma.projectAlert.create({
50+
data: {
51+
friendlyId: generateFriendlyId("alert"),
52+
channelId: alertChannel.id,
53+
projectId: deployment.projectId,
54+
environmentId: deployment.environmentId,
55+
status: "PENDING",
56+
type: alertType,
57+
workerDeploymentId: deployment.id,
58+
},
6259
});
60+
61+
await DeliverAlertService.enqueue(alert.id);
6362
}
6463

65-
static async enqueue(deploymentId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
66-
return await workerQueue.enqueue(
67-
"v3.performDeploymentAlerts",
68-
{
69-
deploymentId,
70-
},
71-
{
72-
tx,
73-
runAt,
74-
jobKey: `performDeploymentAlerts:${deploymentId}`,
75-
}
76-
);
64+
static async enqueue(deploymentId: string, runAt?: Date) {
65+
return await commonWorker.enqueue({
66+
id: `performDeploymentAlerts:${deploymentId}`,
67+
job: "v3.performDeploymentAlerts",
68+
payload: { deploymentId },
69+
availableAt: runAt,
70+
});
7771
}
7872
}

0 commit comments

Comments
 (0)