-
-
Notifications
You must be signed in to change notification settings - Fork 705
engine v1 improved dequeue selection algorithm #1632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Warning Rate limit exceeded@ericallam has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 18 minutes and 47 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughThis pull request introduces a comprehensive overhaul of the queue management system in the web application, focusing on implementing a new Changes
Possibly related PRs
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
apps/webapp/app/v3/marqs/v2.server.ts (1)
Line range hint
113-114
: Possible redundant delay due to duplicatesetTimeout
calls.In the
V2QueueConsumer
'sstart
method, there are two identicalsetTimeout
calls causing double the intended delay. Verify if both delays are necessary; otherwise, remove one to prevent unintended startup latency.
🧹 Nitpick comments (11)
apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts (2)
59-81
: Simplify thequeueKey
method to reduce complexity and improve readability.The
queueKey
method handles multiple overloads by using union types and parameter position overloading, which increases complexity and potential for errors. Consider refactoring the method into separate functions or using more descriptive parameter structures to enhance maintainability.
190-196
: Add validation toorgIdFromQueue
andenvIdFromQueue
methods.The methods
orgIdFromQueue
andenvIdFromQueue
assume that the queue string follows a specific format and may throw errors if the format is unexpected. Consider adding validation and error handling to manage malformed queue strings and prevent runtime exceptions.apps/webapp/app/v3/marqs/v2.server.ts (2)
68-69
: Consider reusing an existing Redis client instance.Creating a new Redis client with
new Redis(redisOptions)
may lead to multiple connections and increased resource usage. If possible, reuse an existing Redis client instance to optimize performance and resource consumption.
Line range hint
225-228
: Use the application's logging mechanism instead ofconsole.log
.Replace the
console.log
statement withlogger.debug
or an appropriate logging method to maintain consistent logging practices within the application.apps/webapp/app/v3/tracer.server.ts (2)
104-109
: Consider using 'logger.error' for error logging in 'startActiveSpan'.In the catch block of
startActiveSpan
, the error is logged usinglogger.debug
, which may not appropriately reflect the severity of the error. Usinglogger.error
would ensure that errors are properly logged and can be more easily identified in logs.Apply this diff to change the log level:
span.setStatus({ code: SpanStatusCode.ERROR, message: error instanceof Error ? error.message : String(error), }); - logger.debug(`Error in span: ${name}`, { error }); + logger.error(`Error in span: ${name}`, { error });
118-149
: Remove unnecessary 'async' keyword from synchronous logging functions.The functions
emitDebugLog
,emitInfoLog
,emitErrorLog
, andemitWarnLog
do not contain anyawait
expressions and execute synchronously. Theasync
keyword is unnecessary and can be removed to improve clarity.Apply this diff to remove the
async
keyword:-export async function emitDebugLog(message: string, params: Record<string, unknown> = {}) { +export function emitDebugLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.DEBUG, body: message, attributes: { ...flattenAttributes(params, "params") }, }); } -export async function emitInfoLog(message: string, params: Record<string, unknown> = {}) { +export function emitInfoLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.INFO, body: message, attributes: { ...flattenAttributes(params, "params") }, }); } -export async function emitErrorLog(message: string, params: Record<string, unknown> = {}) { +export function emitErrorLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.ERROR, body: message, attributes: { ...flattenAttributes(params, "params") }, }); } -export async function emitWarnLog(message: string, params: Record<string, unknown> = {}) { +export function emitWarnLog(message: string, params: Record<string, unknown> = {}) { otelLogger.emit({ severityNumber: SeverityNumber.WARN, body: message, attributes: { ...flattenAttributes(params, "params") }, }); }apps/webapp/test/fairDequeuingStrategy.test.ts (1)
574-595
: Add assertions to test the distribution bias in 'should bias shuffling based on concurrency limits and available capacity'.The test logs the distribution results but lacks assertions to verify if the biasing behaves as expected. Including assertions will ensure the test validates the functionality properly.
Consider adding assertions to check that environments with higher concurrency limits and more available capacity are selected more frequently.
For example:
// Verify distribution across all strategies expect(highLimitPercentage).toBeLessThan(60); expect(lowLimitPercentage).toBeGreaterThan(10); expect(highLimitPercentage).toBeGreaterThan(lowLimitPercentage); + // Assert that 'env-1' (highest limit, most capacity) is selected most frequently + expect(avgDistribution["env-1"]).toBeGreaterThan(avgDistribution["env-2"]); + expect(avgDistribution["env-2"]).toBeGreaterThan(avgDistribution["env-3"]);This strengthens the test by explicitly checking the expected biasing behavior.
apps/webapp/app/v3/marqs/index.server.ts (3)
73-73
: Consider adding validation for 'redis' initializationWhile assigning
this.redis = options.redis
is straightforward, adding validation to ensureredis
is properly initialized could prevent potential runtime errors.
236-240
: Handle cases when no queues are availableIn the
dequeueMessageInEnv
method, ifqueues
is empty, the for-loop is still executed. Consider adding a condition to return early when there are no queues to process.Apply this diff to check for empty queues:
const queues = await this.options.envQueuePriorityStrategy.distributeFairQueuesFromParentQueue( parentQueue, env.id ); + if (queues.length === 0) { + return; + } span.setAttribute("queue_count", queues.length);
330-330
: Add a check for empty queues in 'dequeueMessageInSharedQueue'Similar to the previous comment, if
queues
is empty in thedequeueMessageInSharedQueue
method, the code still proceeds to the for-loop. Consider adding an early return to improve efficiency.Apply this diff to handle empty queues:
const queues = await this.options.queuePriorityStrategy.distributeFairQueuesFromParentQueue( parentQueue, consumerId ); + if (queues.length === 0) { + return; + } span.setAttribute("queue_count", queues.length);apps/webapp/app/v3/tracing.server.ts (1)
35-81
: Reduce code duplication in log emission functionsThe functions
emitDebugLog
,emitInfoLog
,emitErrorLog
, andemitWarnLog
share similar structures. Consider refactoring them into a single function to adhere to the DRY (Don't Repeat Yourself) principle.You could refactor as follows:
export async function emitLog( logger: Logger, severityNumber: SeverityNumber, message: string, params: Record<string, unknown> = {} ) { logger.emit({ severityNumber, body: message, attributes: { ...flattenAttributes(params, "params") }, }); }Then, redefine the specific functions:
export async function emitDebugLog( logger: Logger, message: string, params: Record<string, unknown> = {} ) { await emitLog(logger, SeverityNumber.DEBUG, message, params); } // Similarly for emitInfoLog, emitErrorLog, and emitWarnLog
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (15)
.vscode/launch.json
(1 hunks)apps/webapp/app/env.server.ts
(2 hunks)apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts
(1 hunks)apps/webapp/app/v3/marqs/index.server.ts
(8 hunks)apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts
(4 hunks)apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts
(0 hunks)apps/webapp/app/v3/marqs/types.ts
(2 hunks)apps/webapp/app/v3/marqs/v2.server.ts
(2 hunks)apps/webapp/app/v3/tracer.server.ts
(7 hunks)apps/webapp/app/v3/tracing.server.ts
(1 hunks)apps/webapp/package.json
(1 hunks)apps/webapp/test/fairDequeuingStrategy.test.ts
(1 hunks)apps/webapp/test/utils/marqs.ts
(1 hunks)references/v3-catalog/src/trigger/batch.ts
(3 hunks)references/v3-catalog/trigger.config.ts
(1 hunks)
💤 Files with no reviewable changes (1)
- apps/webapp/app/v3/marqs/simpleWeightedPriorityStrategy.server.ts
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
🔇 Additional comments (18)
apps/webapp/package.json (2)
68-68
: Verify the OpenTelemetry package version.The added package version aligns with other OpenTelemetry packages in the project. Let's verify it's the latest stable version and check for any known vulnerabilities.
✅ Verification successful
Version 0.52.1 is correct and consistent with the project's OpenTelemetry setup
The version 0.52.1 of
@opentelemetry/api-logs
is part of a coordinated set of OpenTelemetry packages used consistently across multiple packages in the project (webapp, core, cli-v3). While newer versions exist, maintaining version alignment between related OpenTelemetry packages is crucial for compatibility.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check npm registry for latest version and any known vulnerabilities # Get latest version from npm echo "Latest version from npm:" npm view @opentelemetry/api-logs version # Check for known vulnerabilities echo -e "\nVulnerability check:" npm audit @opentelemetry/[email protected]Length of output: 557
Script:
#!/bin/bash # Find all package.json files and grep for OpenTelemetry packages echo "OpenTelemetry packages across all package.json files:" fd package.json --type f --exec sh -c 'echo "\n=== {} ==="; cat {} | grep -i opentelemetry'Length of output: 4734
68-68
: Verify compatibility with existing OpenTelemetry packages.The package appears to be compatible with other OpenTelemetry packages that share the same version (0.52.1). However, some core packages are on version 1.25.1. Let's verify there are no compatibility issues.
✅ Verification successful
OpenTelemetry package versions are correctly aligned
The current version setup follows OpenTelemetry's versioning strategy where:
- Logging packages (including api-logs) are consistently at v0.52.1
- Core packages at v1.25.1 are stable and maintain backward compatibility
- Mixed versions are expected and supported by design
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for any reported compatibility issues between OpenTelemetry packages # Search for any compatibility notes in the package documentation echo "Checking package documentation for compatibility notes:" npm view @opentelemetry/api-logs readme | grep -i "compatibility" # List all installed OpenTelemetry packages and their versions for reference echo -e "\nInstalled OpenTelemetry packages:" npm list | grep @opentelemetryLength of output: 327
Script:
#!/bin/bash # Get all OpenTelemetry dependencies from package.json echo "OpenTelemetry dependencies in package.json:" cat apps/webapp/package.json | jq -r '.dependencies | with_entries(select(.key | startswith("@opentelemetry"))) | to_entries | .[] | "\(.key): \(.value)"' # Get detailed package information echo -e "\nPackage information for @opentelemetry/api-logs:" npm view @opentelemetry/api-logs version description repository.url # Get the latest version information echo -e "\nLatest version information:" npm view @opentelemetry/api-logs versions --json | jq -r '.[-3:]' # Check peer dependencies echo -e "\nPeer dependencies:" npm view @opentelemetry/api-logs peerDependenciesLength of output: 1395
apps/webapp/app/v3/marqs/types.ts (1)
Line range hint
8-47
: Method overloads correctly implemented for enhanced flexibility.The addition of method overloads in the
MarQSKeyProducer
interface allows methods to accept either astring
or anAuthenticatedEnvironment
, improving flexibility and usability.apps/webapp/app/v3/marqs/index.server.ts (8)
16-16
: Confirm the necessity of the 'Result' type importThe line imports
type Result
from"ioredis"
. Please verify ifResult
is used within the codebase. If it's not utilized, consider removing it to keep the code clean.
48-48
: Ensure type consistency for 'redis' in 'MarQSOptions'The
redis
property type inMarQSOptions
has been updated toRedis
. Ensure that all instances whereMarQSOptions
is used are updated accordingly and that theRedis
instance is being correctly initialized and passed.
55-56
: Update references to 'MarQSFairDequeueStrategy'The properties
queuePriorityStrategy
andenvQueuePriorityStrategy
now useMarQSFairDequeueStrategy
. Please confirm that all references and implementations are updated to match this new type and that it aligns with the expected interface.
270-271
: Verify the time units for message timing attributesThe attributes
message_timestamp
andmessage_age
are calculated usingmessage.timestamp
andDate.now()
. Ensure that both values are in milliseconds to accurately reflect the message age.
372-373
: Ensure accurate calculation of 'message_age'As with previous timing attributes, confirm that
message.timestamp
andDate.now()
are in the same time unit (milliseconds) for themessage_age
calculation to be accurate.
1607-1609
: Verify Redis and Key Producer initializationThe Redis client and
MarQSShortKeyProducer
are initialized here. Ensure thatredisOptions
correctly reflects the necessary configuration and that the Redis connection is successful.
1616-1627
: Validate 'FairDequeuingStrategy' configuration parametersThe
FairDequeuingStrategy
is being configured with environment variables and biases. Please confirm that all environment variables (MARQS_SHARED_QUEUE_LIMIT
,MARQS_CONCURRENCY_LIMIT_BIAS
, etc.) are properly set and that the biases are tuned to achieve the desired queue behavior.
1629-1641
: Review bias settings for development environment strategyIn the
envQueuePriorityStrategy
, biases are set to specific values (0.0
for concurrency and capacity biases,0.1
for queue age randomization). Ensure these settings are intentional for the development environment and that they support the desired testing scenarios.apps/webapp/app/v3/tracing.server.ts (1)
5-33
: Ensure proper error handling in 'startSpan' functionThe
startSpan
function correctly handles errors and sets the span status. Verify that usingspan.end()
in thefinally
block is appropriate and does not conflict with the span's lifecycle, especially if the span might already be ended elsewhere.references/v3-catalog/trigger.config.ts (1)
16-17
: Confirm the updated project identifierThe
project
value has been changed to"proj_oveexhsnehnarnusktpc"
. Please verify that this is the correct project ID and that any systems depending on this value are updated accordingly.apps/webapp/test/utils/marqs.ts (2)
5-7
: LGTM! Clean factory function implementation.The function follows single responsibility principle and has proper type annotations.
100-126
: LGTM! Well-implemented statistical function.The implementation is mathematically correct, handles edge cases properly, and has excellent documentation.
apps/webapp/app/env.server.ts (1)
202-203
: LGTM! Well-structured environment configuration changes.The new environment variables are properly typed and have reasonable default values. The bias parameters (0.75, 0.3, 0.25) provide good initial values for the fair dequeuing strategy.
Also applies to: 226-231
✅ Verification successful
✅ INTERNAL_OTEL_LOG_EXPORTER_URL is properly implemented and used
The environment variable is correctly used in the OpenTelemetry setup (tracer.server.ts) to conditionally configure log exporting with proper error handling and logging.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Verify the usage of new environment variables in the codebase echo "Checking usage of new environment variables..." # Check for MARQS_SHARED_QUEUE_LIMIT usage rg "MARQS_SHARED_QUEUE_LIMIT" -A 3 # Check for bias parameters usage rg "MARQS_CONCURRENCY_LIMIT_BIAS|MARQS_AVAILABLE_CAPACITY_BIAS|MARQS_QUEUE_AGE_RANDOMIZATION_BIAS" -A 3Length of output: 1930
Script:
#!/bin/bash # Search for OTEL log exporter usage # Check for exact env var name rg "INTERNAL_OTEL_LOG_EXPORTER_URL" -A 5 # Look for OTEL logging related patterns rg -i "opentelemetry|otel.*log|LogExporter" -A 3 # Check for potential configuration files fd -e ts -e js -e json "otel|telemetry" --exec grep -l "log"Length of output: 67462
references/v3-catalog/src/trigger/batch.ts (1)
301-307
: LGTM! Good enhancement to batch processing flexibility.The addition of
largeBatchSize
parameter improves configurability while maintaining backward compatibility with the default value of 21.Also applies to: 613-624
.vscode/launch.json (1)
25-33
: LGTM! Well-configured debug setup.The new debug configuration for fairDequeuingStrategy tests follows the established pattern and includes all necessary settings.
@trigger.dev/build
@trigger.dev/rsc
@trigger.dev/react-hooks
@trigger.dev/sdk
trigger.dev
@trigger.dev/core
commit: |
This improves the engine v1 selection algorithm to be env based instead of queue based. This means that envs with a higher number of queues don't crowd out envs with fewer queues.
We've also stopped pre-calculating queue concurrency and capacity as this is no longer needed (and happens during dequeuing anyways).
More of the logic for selecting the queues to try and dequeue from have been moved out of MarQS and into an external class called
FairDequeuingStrategy
. I've also added tests to make sure this works, including simulating a large number of "distributions" to make sure envs are fairly chosen. A few other things thing class does:age DESC
ordering