Skip to content

Commit c531a9d

Browse files
authored
fix: cleanup ttl expire run graphile jobs (#1373)
* fix: remove ttl expire run graphile jobs when a run is started or completed * Update expireEnqueuedRun.server.ts
1 parent 0bf500f commit c531a9d

File tree

8 files changed

+49
-91
lines changed

8 files changed

+49
-91
lines changed

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import { parsePacket, TaskRunExecution } from "@trigger.dev/core/v3";
2-
import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server";
2+
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
3+
import { MAX_TASK_RUN_ATTEMPTS } from "~/consts";
4+
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
35
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
46
import { logger } from "~/services/logger.server";
7+
import { reportInvocationUsage } from "~/services/platform.v3.server";
58
import { generateFriendlyId } from "../friendlyIdentifiers";
6-
import { BaseService, ServiceValidationError } from "./baseService.server";
7-
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
89
import { machinePresetFromConfig } from "../machinePresets.server";
9-
import { workerQueue } from "~/services/worker.server";
10-
import { MAX_TASK_RUN_ATTEMPTS } from "~/consts";
10+
import { BaseService, ServiceValidationError } from "./baseService.server";
1111
import { CrashTaskRunService } from "./crashTaskRun.server";
12-
import { reportInvocationUsage } from "~/services/platform.v3.server";
12+
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
1313

1414
export class CreateTaskRunAttemptService extends BaseService {
1515
public async call(
@@ -139,6 +139,10 @@ export class CreateTaskRunAttemptService extends BaseService {
139139
status: "EXECUTING",
140140
},
141141
});
142+
143+
if (taskRun.ttl) {
144+
await ExpireEnqueuedRunService.dequeue(taskRun.id, tx);
145+
}
142146
}
143147

144148
return taskRunAttempt;

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts

+3-7
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import { $transaction } from "~/db.server";
12
import { logger } from "~/services/logger.server";
23
import { marqs } from "~/v3/marqs/index.server";
34
import { BaseService } from "./baseService.server";
5+
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
46
import { parseNaturalLanguageDuration } from "./triggerTask.server";
5-
import { workerQueue } from "~/services/worker.server";
6-
import { $transaction } from "~/db.server";
77

88
export class EnqueueDelayedRunService extends BaseService {
99
public async call(runId: string) {
@@ -52,11 +52,7 @@ export class EnqueueDelayedRunService extends BaseService {
5252
const expireAt = parseNaturalLanguageDuration(run.ttl);
5353

5454
if (expireAt) {
55-
await workerQueue.enqueue(
56-
"v3.expireRun",
57-
{ runId: run.id },
58-
{ tx, runAt: expireAt, jobKey: `v3.expireRun.${run.id}` }
59-
);
55+
await ExpireEnqueuedRunService.enqueue(run.id, expireAt, tx);
6056
}
6157
}
6258
});

apps/webapp/app/v3/services/expireEnqueuedRun.server.ts

+14
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,22 @@ import { logger } from "~/services/logger.server";
22
import { BaseService } from "./baseService.server";
33
import { eventRepository } from "../eventRepository.server";
44
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
5+
import { workerQueue } from "~/services/worker.server";
6+
import { PrismaClientOrTransaction } from "~/db.server";
57

68
export class ExpireEnqueuedRunService extends BaseService {
9+
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {
10+
return await workerQueue.dequeue(`v3.expireRun:${runId}`, { tx });
11+
}
12+
13+
public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {
14+
return await workerQueue.enqueue(
15+
"v3.expireRun",
16+
{ runId },
17+
{ runAt, jobKey: `v3.expireRun:${runId}`, tx }
18+
);
19+
}
20+
721
public async call(runId: string) {
822
const run = await this._prisma.taskRun.findUnique({
923
where: {

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

+5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, type FINAL_RUN_STATUSES } fr
77
import { PerformTaskRunAlertsService } from "./alerts/performTaskRunAlerts.server";
88
import { BaseService } from "./baseService.server";
99
import { ResumeDependentParentsService } from "./resumeDependentParents.server";
10+
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
1011

1112
type BaseInput = {
1213
id: string;
@@ -62,6 +63,10 @@ export class FinalizeTaskRunService extends BaseService {
6263
...(include ? { include } : {}),
6364
});
6465

66+
if (run.ttl) {
67+
await ExpireEnqueuedRunService.dequeue(run.id);
68+
}
69+
6570
if (attemptStatus || error) {
6671
await this.finalizeAttempt({ attemptStatus, error, run });
6772
}

apps/webapp/app/v3/services/triggerTask.server.ts

+2-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
2121
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
2222
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
2323
import { handleMetadataPacket } from "~/utils/packets";
24+
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
2425

2526
export type TriggerTaskServiceOptions = {
2627
idempotencyKey?: string;
@@ -435,11 +436,7 @@ export class TriggerTaskService extends BaseService {
435436
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);
436437

437438
if (expireAt) {
438-
await workerQueue.enqueue(
439-
"v3.expireRun",
440-
{ runId: taskRun.id },
441-
{ tx, runAt: expireAt, jobKey: `v3.expireRun.${taskRun.id}` }
442-
);
439+
await ExpireEnqueuedRunService.enqueue(taskRun.id, expireAt, tx);
443440
}
444441
}
445442

references/v3-catalog/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"management": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/management.ts",
1212
"queues": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/queues.ts",
1313
"build:client": "tsup-node ./src/clientUsage.ts --format esm,cjs",
14-
"client": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/clientUsage.ts",
14+
"client": "tsx -r dotenv/config ./src/clientUsage.ts",
1515
"triggerWithLargePayload": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/triggerWithLargePayload.ts",
1616
"generate:prisma": "prisma generate --sql"
1717
},
+10-71
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
1-
import { tasks, runs, TaskOutput, TaskPayload, TaskIdentifier } from "@trigger.dev/sdk/v3";
2-
import { createJsonHeroDoc } from "./trigger/simple.js";
3-
4-
type createJsonHeroDocPayload = TaskPayload<typeof createJsonHeroDoc>; // retrieves the payload type of the task
5-
type createJsonHeroDocOutput = TaskOutput<typeof createJsonHeroDoc>; // retrieves the output type of the task
6-
type createJsonHeroDocIdentifier = TaskIdentifier<typeof createJsonHeroDoc>; // retrieves the identifier of the task
1+
import { tasks } from "@trigger.dev/sdk/v3";
72

83
async function main() {
9-
const anyHandle = await tasks.trigger(
4+
await tasks.trigger(
105
"create-jsonhero-doc",
116
{
127
title: "Hello World",
@@ -15,78 +10,22 @@ async function main() {
1510
},
1611
},
1712
{
18-
delay: "1m",
1913
ttl: "1m",
2014
}
2115
);
2216

23-
const anyRun = await runs.retrieve(anyHandle);
24-
25-
console.log(`Run ${anyHandle.id} status: ${anyRun.status}, ttl: ${anyRun.ttl}`, anyRun.output);
26-
27-
const typedRun = await runs.retrieve<typeof createJsonHeroDoc>(anyHandle.id);
28-
29-
console.log(`Run ${anyHandle.id} status: ${typedRun.status}`, typedRun.output);
30-
31-
await new Promise((resolve) => setTimeout(resolve, 121000)); // wait for 2 minutes
32-
33-
const expiredRun = await runs.retrieve(anyRun.id);
34-
35-
console.log(
36-
`Run ${anyHandle.id} status: ${expiredRun.status}, expired at: ${expiredRun.expiredAt}`,
37-
expiredRun.output
38-
);
39-
40-
const handle = await tasks.trigger<typeof createJsonHeroDoc>("create-jsonhero-doc", {
41-
title: "Hello World",
42-
content: {
43-
message: "Hello, World!",
44-
},
45-
});
46-
47-
console.log(handle);
48-
49-
const typedRetrieveRun = await runs.retrieve(handle);
50-
51-
console.log(`Run ${handle.id} status: ${typedRetrieveRun.status}`, typedRetrieveRun.output);
52-
53-
const completedRun = await runs.poll(handle, { pollIntervalMs: 100 });
54-
55-
console.log(`Run ${handle.id} completed with output:`, completedRun.output);
56-
57-
const run = await tasks.triggerAndPoll<typeof createJsonHeroDoc>("create-jsonhero-doc", {
58-
title: "Hello World",
59-
content: {
60-
message: "Hello, World!",
61-
},
62-
});
63-
64-
console.log(`Run ${run.id} completed with output: `, run.output);
65-
66-
const batchHandle = await tasks.batchTrigger<typeof createJsonHeroDoc>("create-jsonhero-doc", [
17+
await tasks.trigger(
18+
"create-jsonhero-doc",
6719
{
68-
payload: {
69-
title: "Hello World",
70-
content: {
71-
message: "Hello, World!",
72-
},
20+
title: "Hello World",
21+
content: {
22+
message: "Hello, World!",
7323
},
7424
},
7525
{
76-
payload: {
77-
title: "Hello World 2",
78-
content: {
79-
message: "Hello, World 2!",
80-
},
81-
},
82-
},
83-
]);
84-
85-
const firstRunHandle = batchHandle.runs[0];
86-
87-
const run2 = await runs.retrieve(firstRunHandle);
88-
89-
console.log(`Run ${run2.id} completed with output: `, run2.output);
26+
ttl: "1m",
27+
}
28+
);
9029
}
9130

9231
main().catch(console.error);

references/v3-catalog/src/trigger/simple.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,12 @@ export const taskWithSpecialCharacters = task({
7171

7272
export const createJsonHeroDoc = task({
7373
id: "create-jsonhero-doc",
74+
queue: {
75+
concurrencyLimit: 1,
76+
},
7477
run: async (payload: { title: string; content: any }, { ctx }) => {
7578
// Sleep for 5 seconds
76-
await wait.for({ seconds: 5 });
79+
await wait.for({ seconds: 30 });
7780

7881
const response = await fetch("https://jsonhero.io/api/create.json", {
7982
method: "POST",

0 commit comments

Comments
 (0)