-
-
Notifications
You must be signed in to change notification settings - Fork 705
Add MarQS requeueMessage, an atomic version of replaceMessage #1717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes refactor how the messaging system handles message updates and requeuing. The Changes
Sequence Diagram(s)sequenceDiagram
participant S as Service (e.g., CompleteAttemptService)
participant M as MarQS
participant R as Redis
participant C as TaskRunConcurrencyTracker
S->>M: Call requeueMessage(messageId, messageData, timestamp)
M->>R: Execute Redis command requeueMessage
R-->>M: Return acknowledgement
M->>C: Trigger messageRequeued(oldQueue, message)
C-->>M: Log and process message data
Possibly related PRs
Poem
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
apps/webapp/app/v3/marqs/index.server.ts (1)
1204-1274
: Add error handling for the internal #callRequeueMessage
This helper updates Redis to requeue the message and logs the result, always returning true. Consider handling unexpected Redis errors or null responses in the future.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
apps/webapp/app/v3/marqs/index.server.ts
(7 hunks)apps/webapp/app/v3/marqs/types.ts
(1 hunks)apps/webapp/app/v3/services/completeAttempt.server.ts
(2 hunks)apps/webapp/app/v3/services/createCheckpoint.server.ts
(2 hunks)apps/webapp/app/v3/services/resumeBatchRun.server.ts
(1 hunks)apps/webapp/app/v3/services/resumeTaskDependency.server.ts
(1 hunks)apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- apps/webapp/app/v3/services/completeAttempt.server.ts
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - pnpm)
- GitHub Check: e2e / 🧪 CLI v3 tests (windows-latest - npm)
- GitHub Check: typecheck / typecheck
- GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (11)
apps/webapp/app/v3/marqs/index.server.ts (5)
274-306
: Clarify data layering when preserving trace context
The final spread oftraceContext
could overwrite fields inmessageData
if they share keys liketraceparent
ortracestate
. Confirm this layering is the intended behavior.
310-365
: Requeue logic appears valid; confirm behavior for identical queues
This newrequeueMessage
method handles old-message retrieval, sets new data, and re-queues. For clarity, verify the scenario when the old and new queues are the same.
643-643
: Confirm timestamp usage when replacing message
Using theretryAt
value as the new timestamp adjusts scheduling. Double-check that this aligns with existing retry semantics and doesn’t introduce unexpected timing logic.
1700-1754
: New Redis requeueMessage command
The Lua script removes the message from the old queue, reenqueues it in the new queue, then rebalances. Confirm that removing and re-adding the same queue reference is intentional if the queue names match.
260-264
:✅ Verification successful
Simplify method signature and check usage updates
Removing thepriority
andinplace
parameters yields a cleaner interface forreplaceMessage
, but verify that no external calls depend on them.Run the script below to locate any remaining references to the old signature:
🏁 Script executed:
#!/bin/bash rg -A 5 'replaceMessage'Length of output: 3292
Signature Simplification Verified – No External Dependencies on Removed Parameters
The search confirms that all external calls toreplaceMessage
only pass the three arguments (messageId, messageData, and timestamp), and there are no invocations relying on the removedpriority
orinplace
parameters.
- Verified in
apps/webapp/app/v3/services/createCheckpoint.server.ts
that the calls use the new two/three argument structure.- Internal calls in
apps/webapp/app/v3/marqs/index.server.ts
are consistent with the simplified signature.apps/webapp/app/v3/marqs/types.ts (1)
103-103
: New messageRequeued interface method
AddingmessageRequeued
contributes clear handling for requeue operations. Ensure all implementations ofMessageQueueSubscriber
provide consistent logic or no-ops as needed.apps/webapp/app/v3/services/resumeTaskDependency.server.ts (1)
92-107
: Switched from replaceMessage to requeueMessage
UsingrequeueMessage
clears concurrency sets and re-injects the message with updated attributes, which can better handle long-lived dependencies. Confirm no side effects if the dependent attempt relies on old concurrency data.apps/webapp/app/v3/services/createCheckpoint.server.ts (2)
170-179
: LGTM! Atomic message requeuing for duration-based resumption.The change to use
requeueMessage
for scheduling message resumption after a duration is appropriate and maintains the atomic nature of the operation.
300-302
: LGTM! Simplified message replacement for batch waiting.The simplified
replaceMessage
call is appropriate here as it only needs to update the checkpoint event ID while maintaining the message in place.apps/webapp/app/v3/services/resumeBatchRun.server.ts (2)
255-273
: LGTM! Atomic message requeuing for dependent task resumption.The change to use
requeueMessage
for resuming dependent tasks is appropriate and maintains the atomic nature of the operation. The original queue timestamp is correctly used to maintain message ordering.
332-349
: LGTM! Well-structured batch run enqueue method.The new static
enqueue
method is well-designed:
- Takes all necessary parameters including transaction support
- Provides flexible job key handling
- Supports scheduled execution through optional
runAt
async messageRequeued(oldQueue: string, message: MessagePayload): Promise<void> { | ||
logger.debug("TaskRunConcurrencyTracker.messageRequeued()", { | ||
data: message.data, | ||
messageId: message.messageId, | ||
oldQueue, | ||
}); | ||
|
||
const data = this.getMessageData(message); | ||
|
||
if (!data) { | ||
logger.info( | ||
`TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`, | ||
message | ||
); | ||
return; | ||
} | ||
|
||
await this.executionFinished({ | ||
projectId: data.projectId, | ||
taskId: data.taskIdentifier, | ||
runId: message.messageId, | ||
environmentId: data.environmentId, | ||
deployed: data.environmentType !== "DEVELOPMENT", | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect error message in messageRequeued method.
The error message uses "messageReplaced" instead of "messageRequeued" which could be confusing for debugging.
Apply this diff to fix the error message:
- `TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`,
+ `TaskRunConcurrencyTracker.messageRequeued(): could not parse message data`,
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async messageRequeued(oldQueue: string, message: MessagePayload): Promise<void> { | |
logger.debug("TaskRunConcurrencyTracker.messageRequeued()", { | |
data: message.data, | |
messageId: message.messageId, | |
oldQueue, | |
}); | |
const data = this.getMessageData(message); | |
if (!data) { | |
logger.info( | |
`TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`, | |
message | |
); | |
return; | |
} | |
await this.executionFinished({ | |
projectId: data.projectId, | |
taskId: data.taskIdentifier, | |
runId: message.messageId, | |
environmentId: data.environmentId, | |
deployed: data.environmentType !== "DEVELOPMENT", | |
}); | |
} | |
async messageRequeued(oldQueue: string, message: MessagePayload): Promise<void> { | |
logger.debug("TaskRunConcurrencyTracker.messageRequeued()", { | |
data: message.data, | |
messageId: message.messageId, | |
oldQueue, | |
}); | |
const data = this.getMessageData(message); | |
if (!data) { | |
logger.info( | |
`TaskRunConcurrencyTracker.messageRequeued(): could not parse message data`, | |
message | |
); | |
return; | |
} | |
await this.executionFinished({ | |
projectId: data.projectId, | |
taskId: data.taskIdentifier, | |
runId: message.messageId, | |
environmentId: data.environmentId, | |
deployed: data.environmentType !== "DEVELOPMENT", | |
}); | |
} |
Summary by CodeRabbit
New Features
Refactor