Skip to content

Use priority offsets in MarQS and schedule future messages using redis worker #1720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 20, 2025

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Feb 19, 2025

Summary by CodeRabbit

  • New Features

    • Introduced an automated requeue mechanism for scheduled messages with configurable retry settings.
    • Added functionality to requeue messages by their unique identifiers.
    • Added new constants for managing time-related functionalities within the MARQS system.
    • Enhanced heartbeat management with new methods for starting and rescheduling tasks.
  • Refactor

    • Streamlined message prioritization by shifting from numeric/enumerated values to simple string identifiers.
    • Removed legacy prioritization logic to optimize queue management.
  • Tests & Chores

    • Removed outdated priority-related test scenarios.
    • Adjusted task timeouts to improve system stability.
    • Added new tests for the FairDequeuingStrategy to enhance coverage.

Copy link

changeset-bot bot commented Feb 19, 2025

⚠️ No Changeset found

Latest commit: 92d53f8

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

Copy link
Contributor

coderabbitai bot commented Feb 19, 2025

Warning

Rate limit exceeded

@ericallam has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 18 minutes and 26 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between e8a5785 and 92d53f8.

📒 Files selected for processing (1)
  • apps/webapp/app/v3/marqs/v2.server.ts (1 hunks)

Walkthrough

This pull request introduces a new job, scheduleRequeueMessage, in the legacy run engine worker to handle scheduled message requeues by calling requeueMessageById from the MarQS module. It simplifies priority handling by replacing enum usage with string literals, updates method signatures across services, and removes legacy priority-based dequeuing (and its tests). Changes also include refactoring key producer methods and type interfaces in the MarQS subsystem, as well as adjusting timeout parameters in test tasks.

Changes

File(s) Change Summary
apps/webapp/app/v3/legacyRunEngineWorker.server.ts Added a new job scheduleRequeueMessage with its schema (including messageId), a visibility timeout (60,000 ms), and retry settings; imports marqs to call requeueMessageById.
apps/webapp/app/v3/marqs/index.server.ts Updated the MarQS class: introduced requeueMessageById, replaced numeric priority with the new priority type (now handled as a string literal), removed replaceMessage, and refactored the requeue flow.
apps/webapp/app/v3/marqs/marqsKeyProducer.ts
apps/webapp/app/v3/marqs/types.ts
Removed priority-related logic: updated method signatures to eliminate priority parameters; introduced a new Zod-based MarQSPriorityLevel type; adjusted interfaces and message payload types.
apps/webapp/app/v3/services/* (completeAttempt, createCheckpoint, resumeBatchRun, resumeTaskDependency, taskRunConcurrencyTracker) Streamlined requeue and enqueue calls: removed MarQSPriorityLevel imports and replaced enum references with string literals (e.g., "resume", "retry"); updated method signatures (e.g., removed oldQueue from messageRequeued).
apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts
apps/webapp/test/envPriorityDequeueingStrategy.test.ts
Deleted the legacy EnvPriorityDequeuingStrategy implementation and its accompanying test suite.
apps/webapp/test/marqsKeyProducer.test.ts Removed test cases related to the deprecated priority handling in key producer methods.
references/test-tasks/src/trigger/test-reserve-concurrency-system.ts Adjusted timeout parameters: increased wait timeout for a parent run from 10 to 20 seconds and removed explicit timeouts for certain tasks.

Sequence Diagram(s)

sequenceDiagram
    participant Worker as Legacy Run Engine Worker
    participant Scheduler as Job Scheduler
    participant MarQS as MarQS Module
    Worker->>Scheduler: Fetch scheduled job "scheduleRequeueMessage"
    Scheduler-->>Worker: Deliver job payload { messageId }
    Worker->>MarQS: Call requeueMessageById(messageId)
    MarQS-->>Worker: Acknowledge requeued message
Loading

Possibly related PRs

Suggested reviewers

  • matt-aitken

Poem

I'm a rabbit, hopping through the code,
New jobs and requeues lighten my load.
With messages queued and timeouts aligned,
Priority strings now clearly defined.
From legacy workers to MarQS's call,
Our changes make the system stand tall.
🐰✨ Hop along, and happy coding all!


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
apps/webapp/app/v3/marqs/index.server.ts (1)

295-295: Save message only if it exists.

This preserves the existing message record. Consider logging if the message does not exist.

apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts (1)

124-147: LGTM! Method signature simplified by removing unused parameter.

The removal of the oldQueue parameter streamlines the interface while maintaining the core functionality.

However, there's a minor issue in the logging message:

The log message in the error case uses "messageReplaced" instead of "messageRequeued". Apply this diff to fix the message:

-        `TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`,
+        `TaskRunConcurrencyTracker.messageRequeued(): could not parse message data`,
apps/webapp/app/v3/services/completeAttempt.server.ts (1)

617-618: Consider tracking the TODO as a separate issue.

The TODO comment indicates future work to move message scheduling to the LRE redis worker. This would be good to track formally.

Would you like me to create an issue to track this TODO item for moving the message scheduling into the LRE redis worker?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e19bf2a and 1d7834c.

📒 Files selected for processing (13)
  • apps/webapp/app/v3/legacyRunEngineWorker.server.ts (3 hunks)
  • apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (0 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (16 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2 hunks)
  • apps/webapp/app/v3/marqs/types.ts (4 hunks)
  • apps/webapp/app/v3/services/completeAttempt.server.ts (4 hunks)
  • apps/webapp/app/v3/services/createCheckpoint.server.ts (2 hunks)
  • apps/webapp/app/v3/services/resumeBatchRun.server.ts (5 hunks)
  • apps/webapp/app/v3/services/resumeTaskDependency.server.ts (5 hunks)
  • apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts (1 hunks)
  • apps/webapp/test/envPriorityDequeueingStrategy.test.ts (0 hunks)
  • apps/webapp/test/marqsKeyProducer.test.ts (1 hunks)
  • references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (3 hunks)
💤 Files with no reviewable changes (2)
  • apps/webapp/test/envPriorityDequeueingStrategy.test.ts
  • apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (44)
apps/webapp/app/v3/marqs/index.server.ts (22)

14-14: Add reference to SEMATTRS_MESSAGING_OPERATION.

The usage of SEMATTRS_MESSAGING_OPERATION is consistent with the semantic messaging conventions. No issues discovered.


16-16: Added import for flattenAttributes for expanded logging or tracing.

Having a robust approach to flatten message data can be beneficial for instrumentation. Ensure usage remains efficient for large objects.


32-32: New import of MarQSPriorityLevel.

This addition indicates a typed approach to priority handling, improving code clarity.


38-38: Imported legacyRunEngineWorker for scheduling future message requeues.

This import is essential to call the new scheduling job. Good approach for decoupling requeue logic from main queue operations.


198-198: Introduced optional priority parameter.

This improves readability by using a typed enum for priority levels. Be sure to handle undefined cases properly in the method body.


203-203: Inclusion of priority in the message payload.

Providing a dedicated field for priority clarifies the queue ordering mechanism.


222-222: Pass priority field to the messagePayload.

Makes the message's priority explicit for further logic in queue management.


292-292: Retain oldMessage.priority in replaced message.

Ensures that message replacements do not inadvertently lose their original priority.


312-312: Add optional priority parameter to requeueMessage method.

Aligns method with the typed priority approach, ensuring consistent usage throughout the codebase.


324-324: Set new semantic attribute QUEUE for oldMessage.queue.

Maintains trace context about the queue name. This is consistent with messaging instrumentation best practices.


338-341: Merging old data, new data, and trace context.

Combining multiple objects ensures minimal data loss. The ordering also ensures the new data overrides old fields if there's overlap.


349-349: Preserve existing priority when a new priority is not provided.

Ensures consistent priority handling across requeue operations.


353-354: Cancel heartbeat and persist the updated message before requeue.

Cleaning up the heartbeat ensures we don't incorrectly track the old message state, while storing the new message is crucial for consistency.


365-365: Enqueue a future redis job to requeue the message.

Scheduling the requeue handles delayed messages effectively. Verify that timing logic (500ms threshold) aligns with user expectations.


387-416: New method for requeueing message by ID.

Enhances flexibility by allowing scheduled requeues without requiring new data. This improves worker-based asynchronous flows with minimal overhead.


419-419: Creating #saveMessage private method.

Encapsulating message persistence logic is a good approach for code organization.


427-427: Creating #saveMessageIfExists private method.

Ensures messages are only saved if they already exist in Redis. This avoids inadvertently creating new entries.


790-803: Introduced #nudgeTimestampForPriority method.

This offsets the timestamp based on priority to reorder messages. It's a creative approach to prioritize "resume" or "retry" messages. Keep in mind potential collisions if many messages share the same base timestamp.


1029-1029: Use #nudgeTimestampForPriority for final message score.

Incorporates priority into the sorted set score, ensuring queue ordering respects priority levels.


1290-1290: Applying priority offset on requeue.

Maintains consistent logic across initial enqueue and requeue operations.


1342-1342: Notify subscriber about message requeue.

This event callback helps track message flow. Ensure subscriber handles requeue logic appropriately, especially for external instrumentation.


1753-1773: RequeueMessage Lua script adjustments.

Added explicit references to concurrency sets and updated sorting. The approach ensures the re-queued message is properly updated in the queue. Validate concurrency sets remain consistent across all command calls.

apps/webapp/app/v3/marqs/types.ts (7)

30-30: Simplify queueKey signature using typed parameters.

Specifying orgId, envId, queue, concurrencyKey clarifies usage and helps avoid passing invalid arguments.


31-31: Overloaded queueKey method with MarQSKeyProducerEnv.

Improves flexibility by enabling direct usage of environment objects in queue key generation.


33-33: Add queueKeyFromQueue method.

Provides a streamlined approach to generate a queue key when only a raw queue string is available.


66-66: Define MarQSPriorityLevel with Zod enum.

Using string literal union ensures type safety and prevents invalid priority assignments.


68-68: Export typed priority level.

Helps standardize usage throughout the codebase for consistent priority handling.


85-85: Add optional priority to MessagePayload.

Ensures the schema reflects the newly introduced priority logic while preserving backward compatibility.


96-96: Add messageRequeued event to subscriber interface.

Enables listeners to respond to requeue events, facilitating improved auditing or metrics.

apps/webapp/app/v3/legacyRunEngineWorker.server.ts (3)

10-10: Import marqs for requeue operations.

This aligns the worker with the updated MarQS requeue logic.


53-61: Introduce scheduleRequeueMessage job schema.

Defining a separate job for requeue scheduling with a max of 5 attempts allows controlled repetition, preserving reliability for messages needing future processing.


87-89: Implement scheduleRequeueMessage job logic.

Delegates the requeue to marqs, centralizing queue logic and maintaining a consistent approach to message scheduling.

apps/webapp/app/v3/services/resumeTaskDependency.server.ts (2)

53-71: LGTM! Priority handling has been simplified.

The change from MarQSPriorityLevel.resume to string literal "resume" aligns with the broader refactoring effort to simplify priority handling.


91-105: LGTM! Priority handling has been simplified.

The change from MarQSPriorityLevel.resume to string literal "resume" aligns with the broader refactoring effort to simplify priority handling.

apps/webapp/test/marqsKeyProducer.test.ts (1)

94-94: LGTM! Test case updated to reflect removal of priority handling.

The test case has been correctly updated to match the simplified queue key structure without priority.

apps/webapp/app/v3/marqs/marqsKeyProducer.ts (3)

53-54: LGTM! Method signatures simplified by removing priority parameter.

The removal of the priority parameter aligns with the broader effort to simplify priority handling.


59-59: LGTM! Parameter type updated to reflect priority removal.

The parameter type has been correctly updated to only handle string (concurrencyKey) values.


84-93: LGTM! Implementation simplified by removing priority handling.

The implementation has been correctly updated to match the new method signature without priority.

apps/webapp/app/v3/services/createCheckpoint.server.ts (1)

170-179:

❓ Verification inconclusive

LGTM! Priority level change looks good.

The change from MarQSPriorityLevel.resume to string literal "resume" aligns with the broader refactor of priority handling.

Let's verify that this change is consistent across the codebase:


🏁 Script executed:

#!/bin/bash
# Search for any remaining uses of MarQSPriorityLevel.resume
rg "MarQSPriorityLevel\.resume"

Length of output: 33


Priority change verified – please double-check for consistency

The update in apps/webapp/app/v3/services/createCheckpoint.server.ts (lines 170–179), where MarQSPriorityLevel.resume was replaced with the string literal "resume", appears to be correctly applied. The automated search for "MarQSPriorityLevel\.resume" produced no results, suggesting that all instances have been updated as part of the refactor.

However, since the automated search output was minimal, please perform a manual review of the codebase to ensure no references to MarQSPriorityLevel.resume remain.

apps/webapp/app/v3/services/resumeBatchRun.server.ts (2)

191-209: LGTM! Priority level change in enqueueMessage looks good.

The change from MarQSPriorityLevel.resume to string literal "resume" is consistent with the refactor.


254-272: LGTM! Priority level change in requeueMessage looks good.

The change from MarQSPriorityLevel.resume to string literal "resume" maintains consistency.

references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (2)

278-278: Increased timeout for parent task completion.

The timeout has been increased from 10 to 20 seconds, which should help prevent flaky tests that might fail due to timing constraints.


335-335: Removed explicit timeouts for root run completion.

Removing the explicit timeouts will use the default timeout behavior, which is more flexible.

Also applies to: 344-344

apps/webapp/app/v3/services/completeAttempt.server.ts (1)

470-480: LGTM! Priority level change in requeueMessage looks good.

The change from MarQSPriorityLevel.retry to string literal "retry" is consistent with the refactor.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
references/test-tasks/src/trigger/helpers.ts (4)

3-46: Improve type safety when referencing batch results
Lines 29 and 36 use as any, which can mask potential type errors. Consider replacing these casts with a well-defined interface or type guard to ensure better maintainability and reliability.

- const firstRun = batchResult.runs[0] as any;
+ interface RecursiveTaskRun {
+   ok: boolean;
+   // Add other relevant fields if needed
+ }
+ const firstRun = batchResult.runs[0] as RecursiveTaskRun;

53-61: Possible alternative to custom setTimeout
Using wait.for({ milliseconds: payload.delayMs }) can improve consistency if other tasks in the system rely on the same waiting strategy.


151-178: Potential duplication with the resume pattern
genericParentTask and resumeParentTask share a similar structure. Consider refactoring common logic to reduce duplication.


180-186: Provide more details in error
If one child run fails, the error message includes limited info. Include details (e.g., run index or run ID) to simplify debugging.

- throw new Error(`Child task failed: ${batchResult.runs.find((run) => !run.ok)?.error}`);
+ const failedRun = batchResult.runs.find((r) => !r.ok);
+ throw new Error(
+   `Child task failed at run ID: ${failedRun?.id}, error: ${failedRun?.error}`
+ );
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1d7834c and 7a629ae.

📒 Files selected for processing (12)
  • apps/webapp/app/v3/marqs/constants.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (19 hunks)
  • apps/webapp/app/v3/marqs/types.ts (4 hunks)
  • apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts (3 hunks)
  • apps/webapp/app/v3/services/completeAttempt.server.ts (4 hunks)
  • apps/webapp/test/fairDequeuingStrategy.test.ts (2 hunks)
  • internal-packages/redis-worker/src/queue.ts (1 hunks)
  • internal-packages/redis-worker/src/worker.ts (1 hunks)
  • references/test-tasks/src/trigger/helpers.ts (1 hunks)
  • references/test-tasks/src/trigger/test-heartbeats.ts (1 hunks)
  • references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (10 hunks)
  • references/test-tasks/src/utils.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • apps/webapp/app/v3/services/completeAttempt.server.ts
  • apps/webapp/app/v3/marqs/types.ts
⏰ Context from checks skipped due to timeout of 90000ms (1)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (25)
internal-packages/redis-worker/src/worker.ts (1)

166-184: LGTM! Well-structured implementation of the reschedule method.

The implementation is clean, well-documented, and follows the established patterns in the codebase. It properly integrates with the telemetry system and delegates to the queue's reschedule method.

references/test-tasks/src/trigger/helpers.ts (8)

1-2: No concerns on imports
The import structure looks fine and is well-structured.


48-51: Single queue usage looks good
The concurrency limit aligns well with a strictly ordered processing requirement.


63-90: Logic aligns with intended retry mechanism
The handleError implementation correctly adjusts retry behavior. Ensure your test coverage verifies skipping retries vs. adjusting retry delay.


92-110: Duration wait task is well-structured
No issues found. The approach to selectively apply wait.for or promise-based setTimeout is consistent with flexible workflow usage.


112-142: Batch vs. single child task approach is clear
Both paths (batchTriggerAndWait vs. triggerAndWait) are handled consistently, with error checking through either unwrapBatchResult or .unwrap(). Good job ensuring each branch manages errors.


144-149: Simple child task
No issues found. Implementation is straightforward.


188-193: Child task structure is consistent
Implementation matches other child tasks; no issues found here.


195-203: Intentional event loop blocking
This is presumably for testing or demonstration. Just confirm you have coverage ensuring this doesn’t degrade production performance.

references/test-tasks/src/trigger/test-heartbeats.ts (3)

1-5: Import statements look good
All dependencies are properly referenced.


6-17: Simple aggregator task
describeHeartbeats triggers a more detailed testHeartbeats and waits for completion. Make sure you have coverage for timeouts or partial successes.


19-47: Thorough status checks
The steps to wait for "EXECUTING" then require "COMPLETED" are well-defined. Good use of extra time buffer (visibilityTimeoutSeconds + 30).

references/test-tasks/src/utils.ts (1)

9-10: New poll interval parameter is helpful
Introducing pollIntervalMs offers more flexibility for run status checks. Ensure that slower or faster intervals meet performance and resource-usage requirements.

Also applies to: 27-27

references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (4)

1-15: LGTM! Good refactoring of task definitions.

Moving task definitions to helpers.js improves code organization and maintainability by separating test utilities from test cases.


72-72: LGTM! Environment-aware retry delay.

Using a longer retry delay (61s) in non-development environments helps prevent flaky tests by accounting for potential system latency.


121-125: LGTM! Enhanced timing assertions.

The additional assertions strengthen the test cases by verifying that:

  1. Failure runs complete after hold runs
  2. Resume runs complete after hold runs

This ensures the correct execution order in priority-based scheduling.

Also applies to: 247-255


311-311:

❓ Verification inconclusive

Verify if the increased timeout is sufficient.

The timeout for waiting for the parent task to complete has been increased from 10s to 20s. While this helps prevent flaky tests, we should verify if this is sufficient across all environments.


🏁 Script executed:

#!/bin/bash
# Description: Check historical test runs to verify if 20s timeout is sufficient

# Search for test failures related to timeout in the test logs
rg -A 5 "timeout after \d+ seconds" . | grep -B 2 "test/env-reserve-concurrency"

Length of output: 80


Action: Verify 20s Timeout Across Environments

The updated timeout does not show any timeout errors in our automated log searches; however, the absence of output isn’t conclusive. Please manually verify in your CI or test log system that the 20s timeout prevents flaky behavior in the "test/env-reserve-concurrency" environment, ensuring it’s sufficient across all setups.

apps/webapp/app/v3/marqs/constants.server.ts (2)

1-2: Consider potential timestamp overflow issues with large offsets.

The priority offsets of 1 year (31,556,952,000ms) and 6 months (15,778,476,000ms) are quite large. When these values are subtracted from timestamps in #nudgeTimestampForPriority, there's a risk of timestamp overflow if the original timestamp is close to 0.

Run this script to check for potential timestamp overflow:

#!/usr/bin/env python3
import time

# Current timestamp
now = int(time.time() * 1000)  # milliseconds
one_year = 31_556_952 * 1000
six_months = 15_778_476 * 1000

# Test cases
print(f"Current timestamp: {now}")
print(f"With 1 year offset: {now - one_year}")
print(f"With 6 months offset: {now - six_months}")
print(f"Near zero timestamp with 1 year offset: {one_year - one_year}")  # Should be 0
print(f"Near zero timestamp with 6 months offset: {six_months - six_months}")  # Should be 0

3-4: LGTM! Reasonable threshold values for delayed requeuing.

The 500ms thresholds provide a good balance between immediate and delayed processing of requeued messages.

apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts (2)

6-8: LGTM! Well-designed heartbeat initialization.

The startHeartbeat method in V3GraphileVisibilityTimeout provides a clear separation between starting a new heartbeat and maintaining an existing one.


20-27: LGTM! Consistent heartbeat handling across implementations.

The V3LegacyRunEngineWorkerVisibilityTimeout implementation maintains consistency with the Graphile implementation while using the legacy run engine worker.

apps/webapp/test/fairDequeuingStrategy.test.ts (1)

875-1002: LGTM! Comprehensive test coverage for priority offset handling.

The test thoroughly verifies that:

  • Environments with priority offsets are selected more frequently but not excessively
  • Selection percentages are within reasonable bounds (40% for highest age, <20% for lowest)
  • Maximum environment count limit is respected
apps/webapp/app/v3/marqs/index.server.ts (3)

796-809: LGTM! Clear and efficient priority timestamp handling.

The #nudgeTimestampForPriority method effectively adjusts message timestamps based on priority levels using the defined offsets.


1867-1890: LGTM! Well-structured Redis Lua script for delayed requeuing.

The delayedRequeueMessage Redis command correctly handles:

  • Message data persistence
  • Concurrency set cleanup
  • Atomic operations

2147-2160: LGTM! Comprehensive queue priority strategy configuration.

The queuePriorityStrategy configuration effectively balances:

  • Concurrency limits
  • Available capacity
  • Queue age randomization
  • Environment count limits

@ericallam ericallam force-pushed the marqs-improved-priority branch from 7a629ae to e8a5785 Compare February 20, 2025 13:55
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
apps/webapp/app/v3/marqs/index.server.ts (1)

1359-1418: ⚠️ Potential issue

Add error handling in delayed requeuing.

The #callDelayedRequeueMessage method should handle potential Redis errors when scheduling future requeues.

 async #callDelayedRequeueMessage(message: MessagePayload) {
+  try {
     const messageKey = this.keys.messageKey(message.messageId);
     // ... existing code ...
     return true;
+  } catch (error) {
+    logger.error("Failed to schedule delayed requeue", {
+      error,
+      messageId: message.messageId,
+      service: this.name,
+    });
+    throw error;
+  }
 }
🧹 Nitpick comments (10)
internal-packages/redis-worker/src/worker.ts (1)

166-184: Enhance method documentation and consider error handling.

The implementation looks good, but consider these improvements:

  1. Enhance JSDoc with @param and @returns tags for better IDE support.
  2. Consider adding error handling for cases where the job doesn't exist.

Apply this diff to improve the documentation:

  /**
   * Reschedules an existing job to a new available date.
   * If the job isn't in the queue, it will be ignored.
+  * 
+  * @param id - The unique identifier of the job to reschedule
+  * @param availableAt - The new date when the job should become available
+  * @returns A promise that resolves when the job is rescheduled
+  * @throws {Error} If there's an issue with the Redis operation
   */

Consider wrapping the queue operation in a try-catch block:

   reschedule(id: string, availableAt: Date) {
     return startSpan(
       this.tracer,
       "reschedule",
       async (span) => {
-        return this.queue.reschedule(id, availableAt);
+        try {
+          return await this.queue.reschedule(id, availableAt);
+        } catch (error) {
+          this.logger.error(`Failed to reschedule job ${id}`, {
+            name: this.options.name,
+            id,
+            availableAt,
+            error,
+          });
+          throw error;
+        }
       },
apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts (1)

6-12: Eliminate code duplication between startHeartbeat and heartbeat.

Both methods have identical implementations, which violates the DRY principle and could lead to maintenance issues.

Consider refactoring to avoid duplication:

  async startHeartbeat(messageId: string, timeoutInMs: number): Promise<void> {
    await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
  }

  async heartbeat(messageId: string, timeoutInMs: number): Promise<void> {
-   await TaskRunHeartbeatFailedService.enqueue(messageId, new Date(Date.now() + timeoutInMs));
+   return this.startHeartbeat(messageId, timeoutInMs);
  }
references/test-tasks/src/trigger/helpers.ts (3)

3-46: Consider adding exponential backoff or configurable retry

The recursiveTask only retries once. If deeper levels encounter transient failures (e.g., network flakiness), it might be beneficial to allow optional exponential backoff or more flexible retry rules to handle failures gracefully.


180-186: Propagate child task errors more descriptively

Within unwrapBatchResult, you throw an error for the first failed run. Consider logging or consolidating error messages if multiple child tasks fail. This extra detail can ease debugging under heavy batch loads.


195-203: Consider providing a cancellation mechanism

eventLoopLagTask intentionally blocks the event loop for a given duration. Consider adding an early-exit condition (e.g., a cancellation flag) or logs for improved observability. Long event loop blocking may degrade performance for other tasks.

apps/webapp/app/v3/legacyRunEngineWorker.server.ts (1)

10-10: Evaluate import overhead

The direct import of marqs at line 10 can be replaced with a lazy or dynamic import pattern if initialization overhead is a concern. Otherwise, this is fine as-is.

apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2)

53-54: Rename variables to remove references to old priority logic

Now that priority handling is removed, consider renaming concurrencyKeyOrPriority to better reflect its new usage (e.g., maybeConcurrencyKey). This keeps the new logic clear and consistent.

-    concurrencyKeyOrPriority?: string | number
+    maybeConcurrencyKey?: string

Also applies to: 59-59


84-84: Double-check concurrencyKey defaults

In queueKeyFromQueue, descriptor.concurrencyKey can be undefined. Ensure calling code gracefully handles scenarios where concurrency keys are unnecessary. Providing a default or fallback helps prevent runtime errors when concurrency keys are irrelevant.

Also applies to: 91-91

apps/webapp/test/fairDequeuingStrategy.test.ts (1)

875-1002: Consider refactoring duplicated test logic.

The new test case largely duplicates code from the previous test. Consider extracting common test setup and assertions into shared helper functions to improve maintainability.

+ function setupEnvQueues(envSetups: Array<{envId: string, queues: Array<{age: number}>}>, keyProducer: MarQSKeyProducer, redis: Redis) {
+   for (const setup of envSetups) {
+     await setupConcurrency({
+       redis,
+       keyProducer,
+       env: { id: setup.envId, currentConcurrency: 0, limit: 5 },
+     });
+
+     for (let i = 0; i < setup.queues.length; i++) {
+       await setupQueue({
+         redis,
+         keyProducer,
+         parentQueue: "parent-queue",
+         score: now - setup.queues[i].age,
+         queueId: `queue-${setup.envId}-${i}`,
+         orgId: `org-${setup.envId}`,
+         envId: setup.envId,
+       });
+     }
+   }
+ }

+ function verifyEnvSelection(selectedEnvCounts: Record<string, number>, expectations: {
+   highPriorityEnv: string,
+   lowPriorityEnv: string,
+   highPriorityThreshold: number,
+   lowPriorityThreshold: number
+ }) {
+   const { highPriorityEnv, lowPriorityEnv, highPriorityThreshold, lowPriorityThreshold } = expectations;
+   
+   // Calculate selection percentages
+   const totalSelections = Object.values(selectedEnvCounts).reduce((a, b) => a + b, 0);
+   const selectionPercentages = Object.entries(selectedEnvCounts).reduce(
+     (acc, [orgId, count]) => {
+       acc[orgId] = (count / totalSelections) * 100;
+       return acc;
+     },
+     {} as Record<string, number>
+   );
+
+   // Verify selection percentages
+   expect(selectionPercentages[highPriorityEnv]).toBeGreaterThan(highPriorityThreshold);
+   expect(selectionPercentages[lowPriorityEnv] || 0).toBeLessThan(lowPriorityThreshold);
+ }
apps/webapp/app/v3/marqs/index.server.ts (1)

1867-1890: Verify Redis command cleanup.

The new delayedRequeueMessage Redis command correctly cleans up concurrency sets, but consider adding a TTL to the message data to prevent orphaned messages.

 redis.call('SET', messageKey, messageData)
+redis.call('EXPIRE', messageKey, 86400) -- 24 hour TTL
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7a629ae and e8a5785.

📒 Files selected for processing (21)
  • apps/webapp/app/v3/legacyRunEngineWorker.server.ts (3 hunks)
  • apps/webapp/app/v3/marqs/constants.server.ts (1 hunks)
  • apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts (0 hunks)
  • apps/webapp/app/v3/marqs/index.server.ts (19 hunks)
  • apps/webapp/app/v3/marqs/marqsKeyProducer.ts (2 hunks)
  • apps/webapp/app/v3/marqs/types.ts (4 hunks)
  • apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts (3 hunks)
  • apps/webapp/app/v3/services/completeAttempt.server.ts (4 hunks)
  • apps/webapp/app/v3/services/createCheckpoint.server.ts (2 hunks)
  • apps/webapp/app/v3/services/resumeBatchRun.server.ts (5 hunks)
  • apps/webapp/app/v3/services/resumeTaskDependency.server.ts (4 hunks)
  • apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts (1 hunks)
  • apps/webapp/test/envPriorityDequeueingStrategy.test.ts (0 hunks)
  • apps/webapp/test/fairDequeuingStrategy.test.ts (2 hunks)
  • apps/webapp/test/marqsKeyProducer.test.ts (1 hunks)
  • internal-packages/redis-worker/src/queue.ts (1 hunks)
  • internal-packages/redis-worker/src/worker.ts (1 hunks)
  • references/test-tasks/src/trigger/helpers.ts (1 hunks)
  • references/test-tasks/src/trigger/test-heartbeats.ts (1 hunks)
  • references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (10 hunks)
  • references/test-tasks/src/utils.ts (2 hunks)
💤 Files with no reviewable changes (2)
  • apps/webapp/test/envPriorityDequeueingStrategy.test.ts
  • apps/webapp/app/v3/marqs/envPriorityDequeuingStrategy.server.ts
🚧 Files skipped from review as they are similar to previous changes (11)
  • internal-packages/redis-worker/src/queue.ts
  • apps/webapp/app/v3/services/completeAttempt.server.ts
  • apps/webapp/app/v3/services/resumeTaskDependency.server.ts
  • apps/webapp/app/v3/marqs/constants.server.ts
  • apps/webapp/test/marqsKeyProducer.test.ts
  • apps/webapp/app/v3/services/resumeBatchRun.server.ts
  • apps/webapp/app/v3/marqs/types.ts
  • apps/webapp/app/v3/services/createCheckpoint.server.ts
  • apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts
  • references/test-tasks/src/utils.ts
  • references/test-tasks/src/trigger/test-heartbeats.ts
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - pnpm)
  • GitHub Check: units / 🧪 Unit Tests
  • GitHub Check: e2e / 🧪 CLI v3 tests (ubuntu-latest - npm)
  • GitHub Check: typecheck / typecheck
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (7)
apps/webapp/app/v3/marqs/v3VisibilityTimeout.server.ts (1)

20-34: LGTM! Clear separation between creating and rescheduling heartbeats.

The implementation correctly differentiates between:

  • startHeartbeat: Creates a new heartbeat job
  • heartbeat: Reschedules an existing heartbeat job

This separation provides better control over the heartbeat lifecycle.

references/test-tasks/src/trigger/helpers.ts (1)

112-142: Ensure concurrency-limiting logic is fully tested

resumeParentTask enforces a concurrency limit of 1 with queue: { concurrencyLimit: 1 }. If concurrency is critical here, confirm that test coverage includes scenarios where multiple tasks might trigger at once. Let me know if you need help adding concurrency tests.

apps/webapp/app/v3/legacyRunEngineWorker.server.ts (1)

53-61: Confirm scheduling constraints

scheduleRequeueMessage uses a 60-second visibility timeout and 5 maximum retries. Verify if this configuration adequately handles slow network or extended processing times, especially under large loads. Testing or monitoring will help confirm suitability.

references/test-tasks/src/trigger/test-reserve-concurrency-system.ts (3)

72-72: Adjust retry delay based on environment type.

The retry delay is now dynamically set based on the environment type, with a shorter delay (10s) for development and longer delay (61s) for other environments. This helps reduce test execution time in development while maintaining realistic timing in other environments.


121-125: Verify completion order with new assertion.

Good addition of completion order verification to ensure the failure run completes after the hold run. This helps validate the correct execution sequence of the retry priority system.


368-368: Verify impact of removing explicit timeouts.

The removal of explicit timeouts in waitForRunStatus calls might lead to indefinite waits. Consider if default timeouts are appropriate for these cases.

Also applies to: 377-377

apps/webapp/app/v3/marqs/index.server.ts (1)

796-809:

✅ Verification successful

Verify priority offset calculations.

The priority offset calculation logic looks correct, but ensure that the offset values (MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET and MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET) are large enough to prevent race conditions in high-load scenarios.


🏁 Script executed:

#!/bin/bash
# Check the offset values in constants
rg "MARQS_.*_PRIORITY_TIMESTAMP_OFFSET.*=.*" apps/webapp/app/v3/marqs/constants.server.ts

Length of output: 258


Priority Offset Calculation Verified

The offset constants in apps/webapp/app/v3/marqs/constants.server.ts are set as follows:

  • MARQS_RESUME_PRIORITY_TIMESTAMP_OFFSET → 1 year (31,556,952 * 1000)
  • MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET → 6 months (15,778,476 * 1000)

The function #nudgeTimestampForPriority in apps/webapp/app/v3/marqs/index.server.ts properly applies these values based on the priority, and the chosen offsets appear sufficiently large to prevent race conditions in high-load scenarios.

No changes are necessary.

@ericallam ericallam merged commit bf43fd4 into main Feb 20, 2025
11 checks passed
@ericallam ericallam deleted the marqs-improved-priority branch February 20, 2025 14:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant