Skip to content

Commit b0c60a0

Browse files
committed
Move run ttl and delays from graphile to redis worker
1 parent a2c2d92 commit b0c60a0

7 files changed

+91
-42
lines changed

apps/webapp/app/v3/commonWorker.server.ts

+33-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import { singleton } from "~/utils/singleton";
77
import { DeliverAlertService } from "./services/alerts/deliverAlert.server";
88
import { PerformDeploymentAlertsService } from "./services/alerts/performDeploymentAlerts.server";
99
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";
10+
import { ExpireEnqueuedRunService } from "./services/expireEnqueuedRun.server";
11+
import { EnqueueDelayedRunService } from "./services/enqueueDelayedRun.server";
1012

1113
function initializeWorker() {
1214
const redisOptions = {
@@ -52,6 +54,24 @@ function initializeWorker() {
5254
maxAttempts: 3,
5355
},
5456
},
57+
"v3.expireRun": {
58+
schema: z.object({
59+
runId: z.string(),
60+
}),
61+
visibilityTimeoutMs: 60_000,
62+
retry: {
63+
maxAttempts: 6,
64+
},
65+
},
66+
"v3.enqueueDelayedRun": {
67+
schema: z.object({
68+
runId: z.string(),
69+
}),
70+
visibilityTimeoutMs: 60_000,
71+
retry: {
72+
maxAttempts: 6,
73+
},
74+
},
5575
},
5676
concurrency: {
5777
workers: env.COMMON_WORKER_CONCURRENCY_WORKERS,
@@ -65,16 +85,26 @@ function initializeWorker() {
6585
"v3.deliverAlert": async ({ payload }) => {
6686
const service = new DeliverAlertService();
6787

68-
return await service.call(payload.alertId);
88+
await service.call(payload.alertId);
6989
},
7090
"v3.performDeploymentAlerts": async ({ payload }) => {
7191
const service = new PerformDeploymentAlertsService();
7292

73-
return await service.call(payload.deploymentId);
93+
await service.call(payload.deploymentId);
7494
},
7595
"v3.performTaskRunAlerts": async ({ payload }) => {
7696
const service = new PerformTaskRunAlertsService();
77-
return await service.call(payload.runId);
97+
await service.call(payload.runId);
98+
},
99+
"v3.expireRun": async ({ payload }) => {
100+
const service = new ExpireEnqueuedRunService();
101+
102+
await service.call(payload.runId);
103+
},
104+
"v3.enqueueDelayedRun": async ({ payload }) => {
105+
const service = new EnqueueDelayedRunService();
106+
107+
await service.call(payload.runId);
78108
},
79109
},
80110
});

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ export class CreateTaskRunAttemptService extends BaseService {
156156
});
157157

158158
if (taskRun.ttl) {
159-
await ExpireEnqueuedRunService.dequeue(taskRun.id, tx);
159+
await ExpireEnqueuedRunService.ack(taskRun.id, tx);
160160
}
161161
}
162162

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts

+27-1
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,34 @@ import { logger } from "~/services/logger.server";
44
import { marqs } from "~/v3/marqs/index.server";
55
import { BaseService } from "./baseService.server";
66
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
7+
import { commonWorker } from "../commonWorker.server";
8+
import { workerQueue } from "~/services/worker.server";
79

810
export class EnqueueDelayedRunService extends BaseService {
11+
public static async enqueue(runId: string, runAt?: Date) {
12+
await commonWorker.enqueue({
13+
job: "v3.enqueueDelayedRun",
14+
payload: { runId },
15+
availableAt: runAt,
16+
id: `v3.enqueueDelayed:${runId}`,
17+
});
18+
}
19+
20+
public static async reschedule(runId: string, runAt?: Date) {
21+
// We have to do this for now because it's possible that the workerQueue
22+
// was used when the run was first delayed, and EnqueueDelayedRunService.reschedule
23+
// is called from RescheduleTaskRunService, which allows the runAt to be changed
24+
// so if we don't dequeue the old job, we might end up with multiple jobs
25+
await workerQueue.dequeue(`v3.enqueueDelayedRun.${runId}`);
26+
27+
await commonWorker.enqueue({
28+
job: "v3.enqueueDelayedRun",
29+
payload: { runId },
30+
availableAt: runAt,
31+
id: `v3.enqueueDelayed:${runId}`,
32+
});
33+
}
34+
935
public async call(runId: string) {
1036
const run = await this._prisma.taskRun.findFirst({
1137
where: {
@@ -52,7 +78,7 @@ export class EnqueueDelayedRunService extends BaseService {
5278
const expireAt = parseNaturalLanguageDuration(run.ttl);
5379

5480
if (expireAt) {
55-
await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx);
81+
await ExpireEnqueuedRunService.enqueue(run.id, expireAt);
5682
}
5783
}
5884
});

apps/webapp/app/v3/services/expireEnqueuedRun.server.ts

+14-11
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
1+
import { PrismaClientOrTransaction } from "~/db.server";
12
import { logger } from "~/services/logger.server";
2-
import { BaseService } from "./baseService.server";
3+
import { commonWorker } from "../commonWorker.server";
34
import { eventRepository } from "../eventRepository.server";
5+
import { BaseService } from "./baseService.server";
46
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
5-
import { workerQueue } from "~/services/worker.server";
6-
import { PrismaClientOrTransaction } from "~/db.server";
77

88
export class ExpireEnqueuedRunService extends BaseService {
9-
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {
10-
return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx });
9+
public static async ack(runId: string, tx?: PrismaClientOrTransaction) {
10+
// We don't "dequeue" from the workerQueue here because it would be redundant and if this service
11+
// is called for a run that has already started, nothing happens
12+
await commonWorker.ack(`v3.expireRun:${runId}`);
1113
}
1214

13-
public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {
14-
return await workerQueue.enqueue(
15-
"v3.expireRun",
16-
{ runId },
17-
{ runAt, jobKey: `v3.expireRun:${runId}`, tx }
18-
);
15+
public static async enqueue(runId: string, runAt?: Date) {
16+
return await commonWorker.enqueue({
17+
job: "v3.expireRun",
18+
payload: { runId },
19+
availableAt: runAt,
20+
id: `v3.expireRun:${runId}`,
21+
});
1922
}
2023

2124
public async call(runId: string) {

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ export class FinalizeTaskRunService extends BaseService {
101101
});
102102

103103
if (run.ttl) {
104-
await ExpireEnqueuedRunService.dequeue(run.id);
104+
await ExpireEnqueuedRunService.ack(run.id);
105105
}
106106

107107
if (attemptStatus || error) {
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1+
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
12
import { TaskRun } from "@trigger.dev/database";
23
import { BaseService, ServiceValidationError } from "./baseService.server";
3-
import { RescheduleRunRequestBody } from "@trigger.dev/core/v3";
4+
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";
45
import { parseDelay } from "./triggerTask.server";
5-
import { $transaction } from "~/db.server";
6-
import { workerQueue } from "~/services/worker.server";
76

87
export class RescheduleTaskRunService extends BaseService {
98
public async call(taskRun: TaskRun, body: RescheduleRunRequestBody) {
@@ -17,23 +16,17 @@ export class RescheduleTaskRunService extends BaseService {
1716
throw new ServiceValidationError(`Invalid delay: ${body.delay}`);
1817
}
1918

20-
return await $transaction(this._prisma, "reschedule run", async (tx) => {
21-
const updatedRun = await tx.taskRun.update({
22-
where: {
23-
id: taskRun.id,
24-
},
25-
data: {
26-
delayUntil: delay,
27-
},
28-
});
19+
const updatedRun = await this._prisma.taskRun.update({
20+
where: {
21+
id: taskRun.id,
22+
},
23+
data: {
24+
delayUntil: delay,
25+
},
26+
});
2927

30-
await workerQueue.enqueue(
31-
"v3.enqueueDelayedRun",
32-
{ runId: taskRun.id },
33-
{ tx, runAt: delay, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
34-
);
28+
await EnqueueDelayedRunService.reschedule(taskRun.id, delay);
3529

36-
return updatedRun;
37-
});
30+
return updatedRun;
3831
}
3932
}

apps/webapp/app/v3/services/triggerTask.server.ts

+3-6
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { clampMaxDuration } from "../utils/maxDuration";
2828
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
2929
import { Prisma, TaskRun } from "@trigger.dev/database";
3030
import { sanitizeQueueName } from "~/models/taskQueue.server";
31+
import { EnqueueDelayedRunService } from "./enqueueDelayedRun.server";
3132

3233
export type TriggerTaskServiceOptions = {
3334
idempotencyKey?: string;
@@ -515,18 +516,14 @@ export class TriggerTaskService extends BaseService {
515516
}
516517

517518
if (taskRun.delayUntil) {
518-
await workerQueue.enqueue(
519-
"v3.enqueueDelayedRun",
520-
{ runId: taskRun.id },
521-
{ tx, runAt: delayUntil, jobKey: `v3.enqueueDelayedRun.${taskRun.id}` }
522-
);
519+
await EnqueueDelayedRunService.enqueue(taskRun.id, taskRun.delayUntil);
523520
}
524521

525522
if (!taskRun.delayUntil && taskRun.ttl) {
526523
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);
527524

528525
if (expireAt) {
529-
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx);
526+
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt);
530527
}
531528
}
532529

0 commit comments

Comments
 (0)