diff --git a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts index 60221d866c..9e0ad17c78 100644 --- a/apps/webapp/app/v3/services/createBackgroundWorker.server.ts +++ b/apps/webapp/app/v3/services/createBackgroundWorker.server.ts @@ -368,6 +368,7 @@ async function createWorkerQueue( concurrencyLimit ?? undefined, orderableName, queueType, + queue.releaseConcurrencyOnWaitpoint, worker, prisma ); @@ -402,6 +403,7 @@ async function upsertWorkerQueueRecord( concurrencyLimit: number | undefined, orderableName: string, queueType: TaskQueueType, + releaseConcurrencyOnWaitpoint: boolean | undefined, worker: BackgroundWorker, prisma: PrismaClientOrTransaction, attempt: number = 0 @@ -429,6 +431,7 @@ async function upsertWorkerQueueRecord( runtimeEnvironmentId: worker.runtimeEnvironmentId, projectId: worker.projectId, type: queueType, + releaseConcurrencyOnWaitpoint, workers: { connect: { id: worker.id, @@ -437,7 +440,7 @@ async function upsertWorkerQueueRecord( }, }); } else { - await prisma.taskQueue.update({ + taskQueue = await prisma.taskQueue.update({ where: { id: taskQueue.id, }, @@ -445,6 +448,7 @@ async function upsertWorkerQueueRecord( workers: { connect: { id: worker.id } }, version: "V2", orderableName, + releaseConcurrencyOnWaitpoint, }, }); } @@ -458,6 +462,7 @@ async function upsertWorkerQueueRecord( concurrencyLimit, orderableName, queueType, + releaseConcurrencyOnWaitpoint, worker, prisma, attempt + 1 diff --git a/packages/core/src/v3/resource-catalog/standardResourceCatalog.ts b/packages/core/src/v3/resource-catalog/standardResourceCatalog.ts index 0e8eaca4d9..92ae0a33df 100644 --- a/packages/core/src/v3/resource-catalog/standardResourceCatalog.ts +++ b/packages/core/src/v3/resource-catalog/standardResourceCatalog.ts @@ -24,6 +24,31 @@ export class StandardResourceCatalog implements ResourceCatalog { } registerQueueMetadata(queue: QueueManifest): void { + const existingQueue = this._queueMetadata.get(queue.name); + + //if it exists already AND concurrencyLimit or releaseConcurrencyOnWaitpoint is different, log a warning + if (existingQueue) { + const isConcurrencyLimitDifferent = existingQueue.concurrencyLimit !== queue.concurrencyLimit; + const isReleaseConcurrencyOnWaitpointDifferent = + existingQueue.releaseConcurrencyOnWaitpoint !== queue.releaseConcurrencyOnWaitpoint; + + if (isConcurrencyLimitDifferent || isReleaseConcurrencyOnWaitpointDifferent) { + let message = `Queue "${queue.name}" is defined twice, with different settings.`; + if (isConcurrencyLimitDifferent) { + message += `\n - concurrencyLimit: ${existingQueue.concurrencyLimit} vs ${queue.concurrencyLimit}`; + } + if (isReleaseConcurrencyOnWaitpointDifferent) { + message += `\n - releaseConcurrencyOnWaitpoint: ${existingQueue.releaseConcurrencyOnWaitpoint} vs ${queue.releaseConcurrencyOnWaitpoint}`; + } + + message += "\n Keeping the first definition:"; + message += `\n - concurrencyLimit: ${existingQueue.concurrencyLimit}`; + message += `\n - releaseConcurrencyOnWaitpoint: ${existingQueue.releaseConcurrencyOnWaitpoint}`; + console.warn(message); + return; + } + } + this._queueMetadata.set(queue.name, queue); } diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index 53044c2f5e..c307bc25f2 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -206,6 +206,7 @@ type CommonTaskOptions< queue?: { name?: string; concurrencyLimit?: number; + releaseConcurrencyOnWaitpoint?: boolean; }; /** Configure the spec of the [machine](https://trigger.dev/docs/machines) you want your task to run on. * diff --git a/packages/trigger-sdk/src/v3/shared.ts b/packages/trigger-sdk/src/v3/shared.ts index d92a891905..a5b3fca392 100644 --- a/packages/trigger-sdk/src/v3/shared.ts +++ b/packages/trigger-sdk/src/v3/shared.ts @@ -119,7 +119,6 @@ export { SubtaskUnwrapError, TaskRunPromise }; export type Context = TaskRunContext; export function queue(options: QueueOptions): Queue { - // TODO register queue here resourceCatalog.registerQueueMetadata(options); // @ts-expect-error @@ -215,6 +214,7 @@ export function createTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, + releaseConcurrencyOnWaitpoint: queue.releaseConcurrencyOnWaitpoint, }); } @@ -346,6 +346,7 @@ export function createSchemaTask< resourceCatalog.registerQueueMetadata({ name: queue.name, concurrencyLimit: queue.concurrencyLimit, + releaseConcurrencyOnWaitpoint: queue.releaseConcurrencyOnWaitpoint, }); } diff --git a/references/hello-world/src/trigger/release-concurrency.ts b/references/hello-world/src/trigger/release-concurrency.ts new file mode 100644 index 0000000000..9dbaf135ef --- /dev/null +++ b/references/hello-world/src/trigger/release-concurrency.ts @@ -0,0 +1,149 @@ +import { batch, logger, queue, task, wait } from "@trigger.dev/sdk"; +import assert from "node:assert"; +import { setTimeout } from "node:timers/promises"; + +// Queue with concurrency limit and release enabled +const releaseEnabledQueue = queue({ + name: "release-concurrency-test-queue-enabled", + concurrencyLimit: 2, + releaseConcurrencyOnWaitpoint: true, +}); + +// Queue with concurrency limit but release disabled +const releaseDisabledQueue = queue({ + name: "release-concurrency-test-queue-disabled", + concurrencyLimit: 2, + releaseConcurrencyOnWaitpoint: false, +}); + +// Task that runs on the release-enabled queue +const releaseEnabledTask = task({ + id: "release-concurrency-enabled-task", + queue: releaseEnabledQueue, + retry: { + maxAttempts: 1, + }, + run: async (payload: { id: string; waitSeconds: number }, { ctx }) => { + const startedAt = Date.now(); + logger.info(`Run ${payload.id} started at ${startedAt}`); + + // Wait and release concurrency + await wait.for({ seconds: payload.waitSeconds, releaseConcurrency: true }); + + const resumedAt = Date.now(); + await setTimeout(2000); // Additional work after resuming + const completedAt = Date.now(); + + return { id: payload.id, startedAt, resumedAt, completedAt }; + }, +}); + +// Task that runs on the release-disabled queue +const releaseDisabledTask = task({ + id: "release-concurrency-disabled-task", + queue: releaseDisabledQueue, + retry: { + maxAttempts: 1, + }, + run: async (payload: { id: string; waitSeconds: number }, { ctx }) => { + const startedAt = Date.now(); + logger.info(`Run ${payload.id} started ${startedAt}`); + + // Wait without releasing concurrency + await wait.for({ seconds: payload.waitSeconds }); + + const resumedAt = Date.now(); + await setTimeout(2000); + const completedAt = Date.now(); + + return { id: payload.id, startedAt, resumedAt, completedAt }; + }, +}); + +// Main test task +export const waitReleaseConcurrencyTestTask = task({ + id: "wait-release-concurrency-test", + retry: { + maxAttempts: 1, + }, + run: async (payload, { ctx }) => { + logger.info("Starting wait release concurrency test"); + + // Test 1: Queue with release enabled + logger.info("Testing queue with release enabled"); + const enabledResults = await batch.triggerAndWait([ + { id: releaseEnabledTask.id, payload: { id: "e1", waitSeconds: 6 } }, + { id: releaseEnabledTask.id, payload: { id: "e2", waitSeconds: 6 } }, + { id: releaseEnabledTask.id, payload: { id: "e3", waitSeconds: 6 } }, + ]); + + // Verify all tasks completed + assert( + enabledResults.runs.every((r) => r.ok), + "All enabled tasks should complete" + ); + + // Get executions sorted by start time + const enabledExecutions = enabledResults.runs + .map((r) => r.output) + .sort((a, b) => a.startedAt - b.startedAt); + + // Verify that task e3 could start before e1 and e2 completed + // (because concurrency was released during wait) + const e3 = enabledExecutions.find((e) => e.id === "e3"); + const e1e2CompletedAt = Math.max( + ...enabledExecutions.filter((e) => ["e1", "e2"].includes(e.id)).map((e) => e.completedAt) + ); + + assert( + e3.startedAt < e1e2CompletedAt, + "Task e3 should start before e1/e2 complete due to released concurrency" + ); + + logger.info("✅ test with release enabled"); + + // Test 2: Queue with release disabled + logger.info("Testing queue with release disabled"); + const disabledResults = await batch.triggerAndWait([ + { id: releaseDisabledTask.id, payload: { id: "d1", waitSeconds: 6 } }, + { id: releaseDisabledTask.id, payload: { id: "d2", waitSeconds: 6 } }, + { id: releaseDisabledTask.id, payload: { id: "d3", waitSeconds: 6 } }, + ]); + + // Verify all tasks completed + assert( + disabledResults.runs.every((r) => r.ok), + "All disabled tasks should complete" + ); + + // Get executions sorted by start time + const disabledExecutions = disabledResults.runs + .map((r) => r.output) + .sort((a, b) => a.startedAt - b.startedAt); + + // Verify that task d3 could NOT start before d1 or d2 completed + // (because concurrency was not released during wait) + const d3 = disabledExecutions.find((e) => e.id === "d3"); + const d1d2CompletedAt = Math.max( + ...disabledExecutions.filter((e) => ["d1", "d2"].includes(e.id)).map((e) => e.completedAt) + ); + + assert( + d3.startedAt >= d1d2CompletedAt, + "Task d3 should not start before d1/d2 complete when concurrency is not released" + ); + + logger.info("✅ test with release disabled"); + + return { + enabledQueueResults: { + executions: enabledExecutions, + concurrencyReleased: true, + }, + disabledQueueResults: { + executions: disabledExecutions, + concurrencyReleased: false, + }, + }; + }, +});