diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index fe7411e50b..77adfa44e7 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -567,6 +567,7 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"), + RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"), RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1), RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3), RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 7c9b07dcc8..2f201b7cdb 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -77,6 +77,7 @@ function createRunEngine() { }, releaseConcurrency: { disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0", + disableConsumers: env.RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS === "1", maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO, maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES, consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index b4f6bff232..55ac2f258c 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -201,6 +201,7 @@ export class RunEngine { options.releaseConcurrency.disabled ? undefined : { + disableConsumers: options.releaseConcurrency?.disableConsumers, redis: { ...options.queue.redis, // Use base queue redis options ...options.releaseConcurrency?.redis, // Allow overrides diff --git a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts index fcdfb774e3..44849445b1 100644 --- a/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts +++ b/internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts @@ -28,6 +28,7 @@ export type ReleaseConcurrencyQueueOptions = { pollInterval?: number; batchSize?: number; retry?: ReleaseConcurrencyQueueRetryOptions; + disableConsumers?: boolean; }; const QueueItemMetadata = z.object({ @@ -74,7 +75,10 @@ export class ReleaseConcurrencyTokenBucketQueue { }; this.#registerCommands(); - this.#startConsumers(); + + if (!options.disableConsumers) { + this.#startConsumers(); + } } public async quit() { @@ -93,6 +97,12 @@ export class ReleaseConcurrencyTokenBucketQueue { const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor); if (maxTokens === 0) { + this.logger.debug("No tokens available, skipping release", { + releaseQueueDescriptor, + releaserId, + maxTokens, + }); + return; } @@ -109,6 +119,14 @@ export class ReleaseConcurrencyTokenBucketQueue { String(Date.now()) ); + this.logger.debug("Consumed token in attemptToRelease", { + releaseQueueDescriptor, + releaserId, + maxTokens, + result, + releaseQueue, + }); + if (!!result) { await this.#callExecutor(releaseQueueDescriptor, releaserId, { retryCount: 0, @@ -119,6 +137,7 @@ export class ReleaseConcurrencyTokenBucketQueue { releaseQueueDescriptor, releaserId, maxTokens, + releaseQueue, }); } } @@ -130,13 +149,19 @@ export class ReleaseConcurrencyTokenBucketQueue { */ public async consumeToken(releaseQueueDescriptor: T, releaserId: string) { const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor); + const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor); if (maxTokens === 0) { + this.logger.debug("No tokens available, skipping consume", { + releaseQueueDescriptor, + releaserId, + maxTokens, + releaseQueue, + }); + return; } - const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor); - await this.redis.consumeToken( this.masterQueuesKey, this.#bucketKey(releaseQueue), @@ -147,6 +172,13 @@ export class ReleaseConcurrencyTokenBucketQueue { String(maxTokens), String(Date.now()) ); + + this.logger.debug("Consumed token in consumeToken", { + releaseQueueDescriptor, + releaserId, + maxTokens, + releaseQueue, + }); } /** @@ -157,6 +189,11 @@ export class ReleaseConcurrencyTokenBucketQueue { public async returnToken(releaseQueueDescriptor: T, releaserId: string) { const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor); + this.logger.debug("Returning token in returnToken", { + releaseQueueDescriptor, + releaserId, + }); + await this.redis.returnTokenOnly( this.masterQueuesKey, this.#bucketKey(releaseQueue), @@ -165,6 +202,12 @@ export class ReleaseConcurrencyTokenBucketQueue { releaseQueue, releaserId ); + + this.logger.debug("Returned token in returnToken", { + releaseQueueDescriptor, + releaserId, + releaseQueue, + }); } /** @@ -177,10 +220,20 @@ export class ReleaseConcurrencyTokenBucketQueue { const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor); if (amount < 0) { + this.logger.debug("Cannot refill with negative tokens", { + releaseQueueDescriptor, + amount, + }); + throw new Error("Cannot refill with negative tokens"); } if (amount === 0) { + this.logger.debug("Cannot refill with 0 tokens", { + releaseQueueDescriptor, + amount, + }); + return []; } @@ -192,6 +245,13 @@ export class ReleaseConcurrencyTokenBucketQueue { String(amount), String(maxTokens) ); + + this.logger.debug("Refilled tokens in refillTokens", { + releaseQueueDescriptor, + releaseQueue, + amount, + maxTokens, + }); } /** diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 114f38b05e..c7837eadd4 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -51,6 +51,7 @@ export type RunEngineOptions = { maxDelay?: number; // Defaults to 60000 factor?: number; // Defaults to 2 }; + disableConsumers?: boolean; }; }; diff --git a/packages/cli-v3/e2e/utils.ts b/packages/cli-v3/e2e/utils.ts index 9c65ed450c..be158ef599 100644 --- a/packages/cli-v3/e2e/utils.ts +++ b/packages/cli-v3/e2e/utils.ts @@ -314,6 +314,15 @@ export async function executeTestCaseRun({ version: "1.0.0", contentHash, }, + machine: { + name: "small-1x", + cpu: 1, + memory: 256, + centsPerMs: 0.0000001, + }, + }).initialize(); + + const result = await taskRunProcess.execute({ payload: { traceContext: { traceparent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", @@ -374,10 +383,6 @@ export async function executeTestCaseRun({ messageId: "run_1234", }); - await taskRunProcess.initialize(); - - const result = await taskRunProcess.execute(); - await taskRunProcess.cleanup(true); return {