diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 446cd923d6..e60a257257 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -213,6 +213,9 @@ const EnvironmentSchema = z.object({ TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(4_096), // 4KB + + MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(), + MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(), }); export type Environment = z.infer; diff --git a/apps/webapp/app/v3/marqs/index.server.ts b/apps/webapp/app/v3/marqs/index.server.ts index 81fec8e199..6839f7761b 100644 --- a/apps/webapp/app/v3/marqs/index.server.ts +++ b/apps/webapp/app/v3/marqs/index.server.ts @@ -139,6 +139,10 @@ export class MarQS { return this.redis.zcard(this.keys.queueKey(env, queue, concurrencyKey)); } + public async lengthOfEnvQueue(env: AuthenticatedEnvironment) { + return this.redis.zcard(this.keys.envQueueKey(env)); + } + public async oldestMessageInQueue( env: AuthenticatedEnvironment, queue: string, @@ -1074,6 +1078,7 @@ export class MarQS { concurrencyKey, envConcurrencyKey, orgConcurrencyKey, + this.keys.envQueueKeyFromQueue(message.queue), message.queue, message.messageId, JSON.stringify(message), @@ -1111,6 +1116,7 @@ export class MarQS { currentConcurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey, + this.keys.envQueueKeyFromQueue(messageQueue), messageQueue, String(Date.now()), String(this.options.defaultEnvConcurrency), @@ -1187,6 +1193,7 @@ export class MarQS { concurrencyKey, envConcurrencyKey, orgConcurrencyKey, + this.keys.envQueueKeyFromQueue(messageQueue), messageId, messageQueue ); @@ -1234,6 +1241,7 @@ export class MarQS { envConcurrencyKey, orgConcurrencyKey, visibilityQueue, + this.keys.envQueueKeyFromQueue(messageQueue), messageQueue, messageId, String(Date.now()), @@ -1347,7 +1355,7 @@ export class MarQS { #registerCommands() { this.redis.defineCommand("enqueueMessage", { - numberOfKeys: 6, + numberOfKeys: 7, lua: ` local queue = KEYS[1] local parentQueue = KEYS[2] @@ -1355,6 +1363,7 @@ local messageKey = KEYS[3] local concurrencyKey = KEYS[4] local envCurrentConcurrencyKey = KEYS[5] local orgCurrentConcurrencyKey = KEYS[6] +local envQueue = KEYS[7] local queueName = ARGV[1] local messageId = ARGV[2] @@ -1367,6 +1376,9 @@ redis.call('SET', messageKey, messageData) -- Add the message to the queue redis.call('ZADD', queue, messageScore, messageId) +-- Add the message to the env queue +redis.call('ZADD', envQueue, messageScore, messageId) + -- Rebalance the parent queue local earliestMessage = redis.call('ZRANGE', queue, 0, 0, 'WITHSCORES') if #earliestMessage == 0 then @@ -1383,7 +1395,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId) }); this.redis.defineCommand("dequeueMessage", { - numberOfKeys: 8, + numberOfKeys: 9, lua: ` -- Keys: childQueue, parentQueue, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey, currentConcurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey local childQueue = KEYS[1] @@ -1394,6 +1406,7 @@ local orgConcurrencyLimitKey = KEYS[5] local currentConcurrencyKey = KEYS[6] local envCurrentConcurrencyKey = KEYS[7] local orgCurrentConcurrencyKey = KEYS[8] +local envQueueKey = KEYS[9] -- Args: childQueueName, currentTime, defaultEnvConcurrencyLimit, defaultOrgConcurrencyLimit local childQueueName = ARGV[1] @@ -1438,6 +1451,7 @@ local messageScore = tonumber(messages[2]) -- Move message to timeout queue and update concurrency redis.call('ZREM', childQueue, messageId) +redis.call('ZREM', envQueueKey, messageId) redis.call('SADD', currentConcurrencyKey, messageId) redis.call('SADD', envCurrentConcurrencyKey, messageId) redis.call('SADD', orgCurrentConcurrencyKey, messageId) @@ -1474,7 +1488,7 @@ redis.call('SET', messageKey, messageData, 'GET') }); this.redis.defineCommand("acknowledgeMessage", { - numberOfKeys: 7, + numberOfKeys: 8, lua: ` -- Keys: parentQueue, messageKey, messageQueue, visibilityQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey local parentQueue = KEYS[1] @@ -1484,6 +1498,7 @@ local visibilityQueue = KEYS[4] local concurrencyKey = KEYS[5] local envCurrentConcurrencyKey = KEYS[6] local orgCurrentConcurrencyKey = KEYS[7] +local envQueueKey = KEYS[8] -- Args: messageId, messageQueueName local messageId = ARGV[1] @@ -1495,6 +1510,9 @@ redis.call('DEL', messageKey) -- Remove the message from the queue redis.call('ZREM', messageQueue, messageId) +-- Remove the message from the env queue +redis.call('ZREM', envQueueKey, messageId) + -- Rebalance the parent queue local earliestMessage = redis.call('ZRANGE', messageQueue, 0, 0, 'WITHSCORES') if #earliestMessage == 0 then @@ -1514,7 +1532,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId) }); this.redis.defineCommand("nackMessage", { - numberOfKeys: 7, + numberOfKeys: 8, lua: ` -- Keys: childQueueKey, parentQueueKey, visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, messageId local messageKey = KEYS[1] @@ -1524,6 +1542,7 @@ local concurrencyKey = KEYS[4] local envConcurrencyKey = KEYS[5] local orgConcurrencyKey = KEYS[6] local visibilityQueue = KEYS[7] +local envQueueKey = KEYS[8] -- Args: childQueueName, messageId, currentTime, messageScore local childQueueName = ARGV[1] @@ -1547,6 +1566,9 @@ end -- Enqueue the message into the queue redis.call('ZADD', childQueueKey, messageScore, messageId) +-- Enqueue the message into the env queue +redis.call('ZADD', envQueueKey, messageScore, messageId) + -- Rebalance the parent queue local earliestMessage = redis.call('ZRANGE', childQueueKey, 0, 0, 'WITHSCORES') if #earliestMessage == 0 then @@ -1729,6 +1751,7 @@ declare module "ioredis" { concurrencyKey: string, envConcurrencyKey: string, orgConcurrencyKey: string, + envQueue: string, queueName: string, messageId: string, messageData: string, @@ -1745,6 +1768,7 @@ declare module "ioredis" { currentConcurrencyKey: string, envCurrentConcurrencyKey: string, orgCurrentConcurrencyKey: string, + envQueueKey: string, childQueueName: string, currentTime: string, defaultEnvConcurrencyLimit: string, @@ -1766,6 +1790,7 @@ declare module "ioredis" { concurrencyKey: string, envConcurrencyKey: string, orgConcurrencyKey: string, + envQueueKey: string, messageId: string, messageQueueName: string, callback?: Callback @@ -1779,6 +1804,7 @@ declare module "ioredis" { envConcurrencyKey: string, orgConcurrencyKey: string, visibilityQueue: string, + envQueueKey: string, childQueueName: string, messageId: string, currentTime: string, diff --git a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts index f40bff66c9..130cae1e14 100644 --- a/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts +++ b/apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts @@ -128,6 +128,16 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer { return [this.envKeySection(env.id), constants.CURRENT_CONCURRENCY_PART].join(":"); } + envQueueKeyFromQueue(queue: string) { + const envId = this.normalizeQueue(queue).split(":")[3]; + + return `${constants.ENV_PART}:${envId}:${constants.QUEUE_PART}`; + } + + envQueueKey(env: AuthenticatedEnvironment): string { + return [constants.ENV_PART, this.shortId(env.id), constants.QUEUE_PART].join(":"); + } + messageKey(messageId: string) { return `${constants.MESSAGE_PART}:${messageId}`; } diff --git a/apps/webapp/app/v3/marqs/types.ts b/apps/webapp/app/v3/marqs/types.ts index afe6c8fef6..7605099b85 100644 --- a/apps/webapp/app/v3/marqs/types.ts +++ b/apps/webapp/app/v3/marqs/types.ts @@ -26,6 +26,7 @@ export interface MarQSKeyProducer { envConcurrencyLimitKey(env: AuthenticatedEnvironment): string; orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string; queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string; + envQueueKey(env: AuthenticatedEnvironment): string; envSharedQueueKey(env: AuthenticatedEnvironment): string; sharedQueueKey(): string; sharedQueueScanPattern(): string; @@ -44,6 +45,7 @@ export interface MarQSKeyProducer { envCurrentConcurrencyKeyFromQueue(queue: string): string; orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string; + envQueueKeyFromQueue(queue: string): string; messageKey(messageId: string): string; stripKeyPrefix(key: string): string; } diff --git a/apps/webapp/app/v3/queueSizeLimits.server.ts b/apps/webapp/app/v3/queueSizeLimits.server.ts new file mode 100644 index 0000000000..152e97c0e8 --- /dev/null +++ b/apps/webapp/app/v3/queueSizeLimits.server.ts @@ -0,0 +1,40 @@ +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { env } from "~/env.server"; +import { MarQS } from "./marqs/index.server"; + +export type QueueSizeGuardResult = { + isWithinLimits: boolean; + maximumSize?: number; + queueSize?: number; +}; + +export async function guardQueueSizeLimitsForEnv( + environment: AuthenticatedEnvironment, + marqs?: MarQS +): Promise { + const maximumSize = getMaximumSizeForEnvironment(environment); + + if (typeof maximumSize === "undefined") { + return { isWithinLimits: true }; + } + + if (!marqs) { + return { isWithinLimits: true, maximumSize }; + } + + const queueSize = await marqs.lengthOfEnvQueue(environment); + + return { + isWithinLimits: queueSize < maximumSize, + maximumSize, + queueSize, + }; +} + +function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined { + if (environment.type === "DEVELOPMENT") { + return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE; + } else { + return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE; + } +} diff --git a/apps/webapp/app/v3/services/triggerTask.server.ts b/apps/webapp/app/v3/services/triggerTask.server.ts index 2eaf769b27..477c502388 100644 --- a/apps/webapp/app/v3/services/triggerTask.server.ts +++ b/apps/webapp/app/v3/services/triggerTask.server.ts @@ -22,6 +22,7 @@ import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server"; import { handleMetadataPacket } from "~/utils/packets"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; +import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server"; export type TriggerTaskServiceOptions = { idempotencyKey?: string; @@ -82,6 +83,24 @@ export class TriggerTaskService extends BaseService { } } + const queueSizeGuard = await guardQueueSizeLimitsForEnv(environment, marqs); + + logger.debug("Queue size guard result", { + queueSizeGuard, + environment: { + id: environment.id, + type: environment.type, + organization: environment.organization, + project: environment.project, + }, + }); + + if (!queueSizeGuard.isWithinLimits) { + throw new ServiceValidationError( + `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + ); + } + if ( body.options?.tags && typeof body.options.tags !== "string" && diff --git a/packages/database/prisma/migrations/20241003002757_add_max_queue_sizes_to_org/migration.sql b/packages/database/prisma/migrations/20241003002757_add_max_queue_sizes_to_org/migration.sql new file mode 100644 index 0000000000..d114c1f51d --- /dev/null +++ b/packages/database/prisma/migrations/20241003002757_add_max_queue_sizes_to_org/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "Organization" ADD COLUMN "maximumDeployedQueueSize" INTEGER, +ADD COLUMN "maximumDevQueueSize" INTEGER; diff --git a/packages/database/prisma/schema.prisma b/packages/database/prisma/schema.prisma index 6b25eb5712..fc66fc4b44 100644 --- a/packages/database/prisma/schema.prisma +++ b/packages/database/prisma/schema.prisma @@ -114,6 +114,9 @@ model Organization { /// This is deprecated and will be removed in the future maximumSchedulesLimit Int @default(5) + maximumDevQueueSize Int? + maximumDeployedQueueSize Int? + createdAt DateTime @default(now()) updatedAt DateTime @updatedAt deletedAt DateTime? diff --git a/references/v3-catalog/src/trigger/simple.ts b/references/v3-catalog/src/trigger/simple.ts index 1d5786d727..2152622b2f 100644 --- a/references/v3-catalog/src/trigger/simple.ts +++ b/references/v3-catalog/src/trigger/simple.ts @@ -2,6 +2,7 @@ import "server-only"; import { logger, SubtaskUnwrapError, task, tasks, wait } from "@trigger.dev/sdk/v3"; import { traceAsync } from "@/telemetry.js"; import { HeaderGenerator } from "header-generator"; +import { setTimeout as setTimeoutP } from "node:timers/promises"; let headerGenerator = new HeaderGenerator({ browsers: [{ name: "firefox", minVersion: 90 }, { name: "chrome", minVersion: 110 }, "safari"], @@ -208,3 +209,29 @@ export const childTask = task({ }; }, }); + +export const retryTask = task({ + id: "retry-task", + run: async (payload: any) => { + throw new Error("This task will always fail"); + }, +}); + +export const maximumQueueDepthParent = task({ + id: "maximum-queue-depth-parent", + run: async (payload: any) => { + await maximumQueueDepthChild.trigger({}); + await maximumQueueDepthChild.trigger({}); + await maximumQueueDepthChild.trigger({}); + }, +}); + +export const maximumQueueDepthChild = task({ + id: "maximum-queue-depth-child", + queue: { + concurrencyLimit: 1, + }, + run: async (payload: any) => { + await setTimeoutP(10_000); + }, +});