diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index 74d700a680..b044a4d291 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -131,30 +131,28 @@ export class CompleteAttemptService extends BaseService { taskRunAttempt: NonNullable, env?: AuthenticatedEnvironment ): Promise<"COMPLETED"> { - await $transaction(this._prisma, async (tx) => { - await tx.taskRunAttempt.update({ - where: { id: taskRunAttempt.id }, - data: { - status: "COMPLETED", - completedAt: new Date(), - output: completion.output, - outputType: completion.outputType, - usageDurationMs: completion.usage?.durationMs, - taskRun: { - update: { - output: completion.output, - outputType: completion.outputType, - }, + await this._prisma.taskRunAttempt.update({ + where: { id: taskRunAttempt.id }, + data: { + status: "COMPLETED", + completedAt: new Date(), + output: completion.output, + outputType: completion.outputType, + usageDurationMs: completion.usage?.durationMs, + taskRun: { + update: { + output: completion.output, + outputType: completion.outputType, }, }, - }); + }, + }); - const finalizeService = new FinalizeTaskRunService(tx); - await finalizeService.call({ - id: taskRunAttempt.taskRunId, - status: "COMPLETED_SUCCESSFULLY", - completedAt: new Date(), - }); + const finalizeService = new FinalizeTaskRunService(); + await finalizeService.call({ + id: taskRunAttempt.taskRunId, + status: "COMPLETED_SUCCESSFULLY", + completedAt: new Date(), }); // Now we need to "complete" the task run event/span diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 9ba29a8b12..47b3e39a2b 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -414,20 +414,40 @@ export class TriggerTaskService extends BaseService { }, }); + const existingConcurrencyLimit = + typeof taskQueue?.concurrencyLimit === "number" + ? taskQueue.concurrencyLimit + : undefined; + if (taskQueue) { - taskQueue = await tx.taskQueue.update({ - where: { - id: taskQueue.id, - }, - data: { - concurrencyLimit, - rateLimit: body.options.queue.rateLimit, - }, - }); + if (existingConcurrencyLimit !== concurrencyLimit) { + taskQueue = await tx.taskQueue.update({ + where: { + id: taskQueue.id, + }, + data: { + concurrencyLimit: + typeof concurrencyLimit === "number" ? concurrencyLimit : null, + rateLimit: body.options.queue.rateLimit, + }, + }); + + if (typeof taskQueue.concurrencyLimit === "number") { + await marqs?.updateQueueConcurrencyLimits( + environment, + taskQueue.name, + taskQueue.concurrencyLimit + ); + } else { + await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); + } + } } else { + const queueId = generateFriendlyId("queue"); + taskQueue = await tx.taskQueue.create({ data: { - friendlyId: generateFriendlyId("queue"), + friendlyId: queueId, name: queueName, concurrencyLimit, runtimeEnvironmentId: environment.id, @@ -436,16 +456,14 @@ export class TriggerTaskService extends BaseService { type: "NAMED", }, }); - } - if (typeof taskQueue.concurrencyLimit === "number") { - await marqs?.updateQueueConcurrencyLimits( - environment, - taskQueue.name, - taskQueue.concurrencyLimit - ); - } else { - await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name); + if (typeof taskQueue.concurrencyLimit === "number") { + await marqs?.updateQueueConcurrencyLimits( + environment, + taskQueue.name, + taskQueue.concurrencyLimit + ); + } } } diff --git a/references/v3-catalog/src/trigger/batch.ts b/references/v3-catalog/src/trigger/batch.ts index 5e25a2eeb6..bf005079f4 100644 --- a/references/v3-catalog/src/trigger/batch.ts +++ b/references/v3-catalog/src/trigger/batch.ts @@ -55,7 +55,7 @@ export const batchChildTask = task({ retry: { maxAttempts: 2, }, - run: async (payload: string, { ctx }) => { + run: async (payload: any, { ctx }) => { logger.info("Processing child task", { payload }); await wait.for({ seconds: 1 });