Skip to content

Add max queue depth limits #1376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ const EnvironmentSchema = z.object({
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(4_096), // 4KB

MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
34 changes: 30 additions & 4 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ export class MarQS {
return this.redis.zcard(this.keys.queueKey(env, queue, concurrencyKey));
}

public async lengthOfEnvQueue(env: AuthenticatedEnvironment) {
return this.redis.zcard(this.keys.envQueueKey(env));
}

public async oldestMessageInQueue(
env: AuthenticatedEnvironment,
queue: string,
Expand Down Expand Up @@ -1074,6 +1078,7 @@ export class MarQS {
concurrencyKey,
envConcurrencyKey,
orgConcurrencyKey,
this.keys.envQueueKeyFromQueue(message.queue),
message.queue,
message.messageId,
JSON.stringify(message),
Expand Down Expand Up @@ -1111,6 +1116,7 @@ export class MarQS {
currentConcurrencyKey,
envCurrentConcurrencyKey,
orgCurrentConcurrencyKey,
this.keys.envQueueKeyFromQueue(messageQueue),
messageQueue,
String(Date.now()),
String(this.options.defaultEnvConcurrency),
Expand Down Expand Up @@ -1187,6 +1193,7 @@ export class MarQS {
concurrencyKey,
envConcurrencyKey,
orgConcurrencyKey,
this.keys.envQueueKeyFromQueue(messageQueue),
messageId,
messageQueue
);
Expand Down Expand Up @@ -1234,6 +1241,7 @@ export class MarQS {
envConcurrencyKey,
orgConcurrencyKey,
visibilityQueue,
this.keys.envQueueKeyFromQueue(messageQueue),
messageQueue,
messageId,
String(Date.now()),
Expand Down Expand Up @@ -1347,14 +1355,15 @@ export class MarQS {

#registerCommands() {
this.redis.defineCommand("enqueueMessage", {
numberOfKeys: 6,
numberOfKeys: 7,
lua: `
local queue = KEYS[1]
local parentQueue = KEYS[2]
local messageKey = KEYS[3]
local concurrencyKey = KEYS[4]
local envCurrentConcurrencyKey = KEYS[5]
local orgCurrentConcurrencyKey = KEYS[6]
local envQueue = KEYS[7]

local queueName = ARGV[1]
local messageId = ARGV[2]
Expand All @@ -1367,6 +1376,9 @@ redis.call('SET', messageKey, messageData)
-- Add the message to the queue
redis.call('ZADD', queue, messageScore, messageId)

-- Add the message to the env queue
redis.call('ZADD', envQueue, messageScore, messageId)

-- Rebalance the parent queue
local earliestMessage = redis.call('ZRANGE', queue, 0, 0, 'WITHSCORES')
if #earliestMessage == 0 then
Expand All @@ -1383,7 +1395,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
});

this.redis.defineCommand("dequeueMessage", {
numberOfKeys: 8,
numberOfKeys: 9,
lua: `
-- Keys: childQueue, parentQueue, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey, currentConcurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
local childQueue = KEYS[1]
Expand All @@ -1394,6 +1406,7 @@ local orgConcurrencyLimitKey = KEYS[5]
local currentConcurrencyKey = KEYS[6]
local envCurrentConcurrencyKey = KEYS[7]
local orgCurrentConcurrencyKey = KEYS[8]
local envQueueKey = KEYS[9]

-- Args: childQueueName, currentTime, defaultEnvConcurrencyLimit, defaultOrgConcurrencyLimit
local childQueueName = ARGV[1]
Expand Down Expand Up @@ -1438,6 +1451,7 @@ local messageScore = tonumber(messages[2])

-- Move message to timeout queue and update concurrency
redis.call('ZREM', childQueue, messageId)
redis.call('ZREM', envQueueKey, messageId)
redis.call('SADD', currentConcurrencyKey, messageId)
redis.call('SADD', envCurrentConcurrencyKey, messageId)
redis.call('SADD', orgCurrentConcurrencyKey, messageId)
Expand Down Expand Up @@ -1474,7 +1488,7 @@ redis.call('SET', messageKey, messageData, 'GET')
});

this.redis.defineCommand("acknowledgeMessage", {
numberOfKeys: 7,
numberOfKeys: 8,
lua: `
-- Keys: parentQueue, messageKey, messageQueue, visibilityQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
local parentQueue = KEYS[1]
Expand All @@ -1484,6 +1498,7 @@ local visibilityQueue = KEYS[4]
local concurrencyKey = KEYS[5]
local envCurrentConcurrencyKey = KEYS[6]
local orgCurrentConcurrencyKey = KEYS[7]
local envQueueKey = KEYS[8]

-- Args: messageId, messageQueueName
local messageId = ARGV[1]
Expand All @@ -1495,6 +1510,9 @@ redis.call('DEL', messageKey)
-- Remove the message from the queue
redis.call('ZREM', messageQueue, messageId)

-- Remove the message from the env queue
redis.call('ZREM', envQueueKey, messageId)

-- Rebalance the parent queue
local earliestMessage = redis.call('ZRANGE', messageQueue, 0, 0, 'WITHSCORES')
if #earliestMessage == 0 then
Expand All @@ -1514,7 +1532,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
});

this.redis.defineCommand("nackMessage", {
numberOfKeys: 7,
numberOfKeys: 8,
lua: `
-- Keys: childQueueKey, parentQueueKey, visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, messageId
local messageKey = KEYS[1]
Expand All @@ -1524,6 +1542,7 @@ local concurrencyKey = KEYS[4]
local envConcurrencyKey = KEYS[5]
local orgConcurrencyKey = KEYS[6]
local visibilityQueue = KEYS[7]
local envQueueKey = KEYS[8]

-- Args: childQueueName, messageId, currentTime, messageScore
local childQueueName = ARGV[1]
Expand All @@ -1547,6 +1566,9 @@ end
-- Enqueue the message into the queue
redis.call('ZADD', childQueueKey, messageScore, messageId)

-- Enqueue the message into the env queue
redis.call('ZADD', envQueueKey, messageScore, messageId)

-- Rebalance the parent queue
local earliestMessage = redis.call('ZRANGE', childQueueKey, 0, 0, 'WITHSCORES')
if #earliestMessage == 0 then
Expand Down Expand Up @@ -1729,6 +1751,7 @@ declare module "ioredis" {
concurrencyKey: string,
envConcurrencyKey: string,
orgConcurrencyKey: string,
envQueue: string,
queueName: string,
messageId: string,
messageData: string,
Expand All @@ -1745,6 +1768,7 @@ declare module "ioredis" {
currentConcurrencyKey: string,
envCurrentConcurrencyKey: string,
orgCurrentConcurrencyKey: string,
envQueueKey: string,
childQueueName: string,
currentTime: string,
defaultEnvConcurrencyLimit: string,
Expand All @@ -1766,6 +1790,7 @@ declare module "ioredis" {
concurrencyKey: string,
envConcurrencyKey: string,
orgConcurrencyKey: string,
envQueueKey: string,
messageId: string,
messageQueueName: string,
callback?: Callback<void>
Expand All @@ -1779,6 +1804,7 @@ declare module "ioredis" {
envConcurrencyKey: string,
orgConcurrencyKey: string,
visibilityQueue: string,
envQueueKey: string,
childQueueName: string,
messageId: string,
currentTime: string,
Expand Down
10 changes: 10 additions & 0 deletions apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer {
return [this.envKeySection(env.id), constants.CURRENT_CONCURRENCY_PART].join(":");
}

envQueueKeyFromQueue(queue: string) {
const envId = this.normalizeQueue(queue).split(":")[3];

return `${constants.ENV_PART}:${envId}:${constants.QUEUE_PART}`;
}

envQueueKey(env: AuthenticatedEnvironment): string {
return [constants.ENV_PART, this.shortId(env.id), constants.QUEUE_PART].join(":");
}

messageKey(messageId: string) {
return `${constants.MESSAGE_PART}:${messageId}`;
}
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/v3/marqs/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export interface MarQSKeyProducer {
envConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string;
envQueueKey(env: AuthenticatedEnvironment): string;
envSharedQueueKey(env: AuthenticatedEnvironment): string;
sharedQueueKey(): string;
sharedQueueScanPattern(): string;
Expand All @@ -44,6 +45,7 @@ export interface MarQSKeyProducer {
envCurrentConcurrencyKeyFromQueue(queue: string): string;
orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string;
envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string;
envQueueKeyFromQueue(queue: string): string;
messageKey(messageId: string): string;
stripKeyPrefix(key: string): string;
}
Expand Down
40 changes: 40 additions & 0 deletions apps/webapp/app/v3/queueSizeLimits.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { env } from "~/env.server";
import { MarQS } from "./marqs/index.server";

export type QueueSizeGuardResult = {
isWithinLimits: boolean;
maximumSize?: number;
queueSize?: number;
};

export async function guardQueueSizeLimitsForEnv(
environment: AuthenticatedEnvironment,
marqs?: MarQS
): Promise<QueueSizeGuardResult> {
const maximumSize = getMaximumSizeForEnvironment(environment);

if (typeof maximumSize === "undefined") {
return { isWithinLimits: true };
}

if (!marqs) {
return { isWithinLimits: true, maximumSize };
}

const queueSize = await marqs.lengthOfEnvQueue(environment);

return {
isWithinLimits: queueSize < maximumSize,
maximumSize,
queueSize,
};
}

function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined {
if (environment.type === "DEVELOPMENT") {
return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE;
} else {
return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE;
}
}
19 changes: 19 additions & 0 deletions apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
import { handleMetadataPacket } from "~/utils/packets";
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -82,6 +83,24 @@ export class TriggerTaskService extends BaseService {
}
}

const queueSizeGuard = await guardQueueSizeLimitsForEnv(environment, marqs);

logger.debug("Queue size guard result", {
queueSizeGuard,
environment: {
id: environment.id,
type: environment.type,
organization: environment.organization,
project: environment.project,
},
});

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
);
}

if (
body.options?.tags &&
typeof body.options.tags !== "string" &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Organization" ADD COLUMN "maximumDeployedQueueSize" INTEGER,
ADD COLUMN "maximumDevQueueSize" INTEGER;
3 changes: 3 additions & 0 deletions packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ model Organization {
/// This is deprecated and will be removed in the future
maximumSchedulesLimit Int @default(5)

maximumDevQueueSize Int?
maximumDeployedQueueSize Int?

createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
deletedAt DateTime?
Expand Down
27 changes: 27 additions & 0 deletions references/v3-catalog/src/trigger/simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import "server-only";
import { logger, SubtaskUnwrapError, task, tasks, wait } from "@trigger.dev/sdk/v3";
import { traceAsync } from "@/telemetry.js";
import { HeaderGenerator } from "header-generator";
import { setTimeout as setTimeoutP } from "node:timers/promises";

let headerGenerator = new HeaderGenerator({
browsers: [{ name: "firefox", minVersion: 90 }, { name: "chrome", minVersion: 110 }, "safari"],
Expand Down Expand Up @@ -208,3 +209,29 @@ export const childTask = task({
};
},
});

export const retryTask = task({
id: "retry-task",
run: async (payload: any) => {
throw new Error("This task will always fail");
},
});

export const maximumQueueDepthParent = task({
id: "maximum-queue-depth-parent",
run: async (payload: any) => {
await maximumQueueDepthChild.trigger({});
await maximumQueueDepthChild.trigger({});
await maximumQueueDepthChild.trigger({});
},
});

export const maximumQueueDepthChild = task({
id: "maximum-queue-depth-child",
queue: {
concurrencyLimit: 1,
},
run: async (payload: any) => {
await setTimeoutP(10_000);
},
});