Skip to content

Commit 23b4c2a

Browse files
authored
Add MarQS requeueMessage, an atomic version of replaceMessage (#1717)
* New MarQS method requeueMessage, an atomic version of replace message * Remove redundant call to remove the message from the env queue in requeueMessage
1 parent 7b1159e commit 23b4c2a

7 files changed

+236
-32
lines changed

apps/webapp/app/v3/marqs/index.server.ts

+201-18
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,7 @@ export class MarQS {
260260
public async replaceMessage(
261261
messageId: string,
262262
messageData: Record<string, unknown>,
263-
timestamp?: number,
264-
priority?: number,
265-
inplace?: boolean
263+
timestamp?: number
266264
) {
267265
return this.#trace(
268266
"replaceMessage",
@@ -273,6 +271,57 @@ export class MarQS {
273271
return;
274272
}
275273

274+
span.setAttributes({
275+
[SemanticAttributes.QUEUE]: oldMessage.queue,
276+
[SemanticAttributes.MESSAGE_ID]: oldMessage.messageId,
277+
[SemanticAttributes.CONCURRENCY_KEY]: oldMessage.concurrencyKey,
278+
[SemanticAttributes.PARENT_QUEUE]: oldMessage.parentQueue,
279+
});
280+
281+
const traceContext = {
282+
traceparent: oldMessage.data.traceparent,
283+
tracestate: oldMessage.data.tracestate,
284+
};
285+
286+
const newMessage: MessagePayload = {
287+
version: "1",
288+
// preserve original trace context
289+
data: { ...oldMessage.data, ...messageData, ...traceContext, queue: oldMessage.queue },
290+
queue: oldMessage.queue,
291+
concurrencyKey: oldMessage.concurrencyKey,
292+
timestamp: timestamp ?? Date.now(),
293+
messageId,
294+
parentQueue: oldMessage.parentQueue,
295+
};
296+
297+
await this.#callReplaceMessage(newMessage);
298+
},
299+
{
300+
kind: SpanKind.CONSUMER,
301+
attributes: {
302+
[SEMATTRS_MESSAGING_OPERATION]: "replace",
303+
[SEMATTRS_MESSAGE_ID]: messageId,
304+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
305+
},
306+
}
307+
);
308+
}
309+
310+
public async requeueMessage(
311+
messageId: string,
312+
messageData: Record<string, unknown>,
313+
timestamp?: number,
314+
priority?: number
315+
) {
316+
return this.#trace(
317+
"requeueMessage",
318+
async (span) => {
319+
const oldMessage = await this.readMessage(messageId);
320+
321+
if (!oldMessage) {
322+
return;
323+
}
324+
276325
const queue = this.keys.queueKeyFromQueue(oldMessage.queue, priority);
277326

278327
span.setAttributes({
@@ -298,27 +347,16 @@ export class MarQS {
298347
parentQueue: oldMessage.parentQueue,
299348
};
300349

301-
if (inplace) {
302-
await this.#callReplaceMessage(newMessage);
303-
return;
304-
}
305-
306350
await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);
307351

308-
await this.#callAcknowledgeMessage({
309-
parentQueue: oldMessage.parentQueue,
310-
messageQueue: oldMessage.queue,
311-
messageId,
312-
});
352+
await this.#callRequeueMessage(oldMessage.queue, newMessage);
313353

314-
await this.#callEnqueueMessage(newMessage);
315-
316-
await this.options.subscriber?.messageReplaced(newMessage);
354+
await this.options.subscriber?.messageRequeued(oldMessage.queue, newMessage);
317355
},
318356
{
319357
kind: SpanKind.CONSUMER,
320358
attributes: {
321-
[SEMATTRS_MESSAGING_OPERATION]: "replace",
359+
[SEMATTRS_MESSAGING_OPERATION]: "requeue",
322360
[SEMATTRS_MESSAGE_ID]: messageId,
323361
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
324362
},
@@ -602,7 +640,7 @@ export class MarQS {
602640
});
603641

604642
if (updates) {
605-
await this.replaceMessage(messageId, updates, retryAt, undefined, true);
643+
await this.replaceMessage(messageId, updates, retryAt);
606644
}
607645

608646
await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);
@@ -1163,6 +1201,78 @@ export class MarQS {
11631201
);
11641202
}
11651203

1204+
async #callRequeueMessage(oldQueue: string, message: MessagePayload) {
1205+
const queueKey = message.queue;
1206+
const oldQueueKey = oldQueue;
1207+
const parentQueueKey = message.parentQueue;
1208+
const messageKey = this.keys.messageKey(message.messageId);
1209+
const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKeyFromQueue(message.queue);
1210+
const queueReserveConcurrencyKey = this.keys.queueReserveConcurrencyKeyFromQueue(message.queue);
1211+
const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
1212+
const envReserveConcurrencyKey = this.keys.envReserveConcurrencyKeyFromQueue(message.queue);
1213+
const envQueueKey = this.keys.envQueueKeyFromQueue(message.queue);
1214+
1215+
const queueName = message.queue;
1216+
const oldQueueName = oldQueue;
1217+
const messageId = message.messageId;
1218+
const messageData = JSON.stringify(message);
1219+
const messageScore = String(message.timestamp);
1220+
1221+
logger.debug("Calling requeueMessage", {
1222+
service: this.name,
1223+
queueKey,
1224+
oldQueueKey,
1225+
parentQueueKey,
1226+
messageKey,
1227+
queueCurrentConcurrencyKey,
1228+
queueReserveConcurrencyKey,
1229+
envCurrentConcurrencyKey,
1230+
envReserveConcurrencyKey,
1231+
envQueueKey,
1232+
queueName,
1233+
oldQueueName,
1234+
messageId,
1235+
messageData,
1236+
messageScore,
1237+
});
1238+
1239+
const result = await this.redis.requeueMessage(
1240+
queueKey,
1241+
oldQueueKey,
1242+
parentQueueKey,
1243+
messageKey,
1244+
queueCurrentConcurrencyKey,
1245+
queueReserveConcurrencyKey,
1246+
envCurrentConcurrencyKey,
1247+
envReserveConcurrencyKey,
1248+
envQueueKey,
1249+
queueName,
1250+
oldQueueName,
1251+
messageId,
1252+
messageData,
1253+
messageScore
1254+
);
1255+
1256+
logger.debug("requeueMessage result", {
1257+
service: this.name,
1258+
queueKey,
1259+
parentQueueKey,
1260+
messageKey,
1261+
queueCurrentConcurrencyKey,
1262+
queueReserveConcurrencyKey,
1263+
envCurrentConcurrencyKey,
1264+
envReserveConcurrencyKey,
1265+
envQueueKey,
1266+
queueName,
1267+
messageId,
1268+
messageData,
1269+
messageScore,
1270+
result,
1271+
});
1272+
1273+
return true;
1274+
}
1275+
11661276
async #callAcknowledgeMessage({
11671277
parentQueue,
11681278
messageQueue,
@@ -1587,6 +1697,61 @@ redis.call('DEL', messageKey)
15871697
`,
15881698
});
15891699

1700+
this.redis.defineCommand("requeueMessage", {
1701+
numberOfKeys: 9,
1702+
lua: `
1703+
local queueKey = KEYS[1]
1704+
local oldQueueKey = KEYS[2]
1705+
local parentQueueKey = KEYS[3]
1706+
local messageKey = KEYS[4]
1707+
local queueCurrentConcurrencyKey = KEYS[5]
1708+
local queueReserveConcurrencyKey = KEYS[6]
1709+
local envCurrentConcurrencyKey = KEYS[7]
1710+
local envReserveConcurrencyKey = KEYS[8]
1711+
local envQueueKey = KEYS[9]
1712+
1713+
local queueName = ARGV[1]
1714+
local oldQueueName = ARGV[2]
1715+
local messageId = ARGV[3]
1716+
local messageData = ARGV[4]
1717+
local messageScore = ARGV[5]
1718+
1719+
-- First remove the message from the old queue
1720+
redis.call('ZREM', oldQueueKey, messageId)
1721+
1722+
-- Write the new message data
1723+
redis.call('SET', messageKey, messageData)
1724+
1725+
-- Add the message to the new queue with a new score
1726+
redis.call('ZADD', queueKey, messageScore, messageId)
1727+
redis.call('ZADD', envQueueKey, messageScore, messageId)
1728+
1729+
-- Rebalance the parent queue (for the new queue)
1730+
local earliestMessage = redis.call('ZRANGE', queueKey, 0, 0, 'WITHSCORES')
1731+
if #earliestMessage == 0 then
1732+
redis.call('ZREM', parentQueueKey, queueName)
1733+
else
1734+
redis.call('ZADD', parentQueueKey, earliestMessage[2], queueName)
1735+
end
1736+
1737+
-- Rebalance the parent queue (for the old queue)
1738+
local earliestMessage = redis.call('ZRANGE', oldQueueKey, 0, 0, 'WITHSCORES')
1739+
if #earliestMessage == 0 then
1740+
redis.call('ZREM', parentQueueKey, oldQueueName)
1741+
else
1742+
redis.call('ZADD', parentQueueKey, earliestMessage[2], oldQueueName)
1743+
end
1744+
1745+
-- Clear all concurrency sets (combined from both scripts)
1746+
redis.call('SREM', queueCurrentConcurrencyKey, messageId)
1747+
redis.call('SREM', queueReserveConcurrencyKey, messageId)
1748+
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1749+
redis.call('SREM', envReserveConcurrencyKey, messageId)
1750+
1751+
return true
1752+
`,
1753+
});
1754+
15901755
this.redis.defineCommand("nackMessage", {
15911756
numberOfKeys: 7,
15921757
lua: `
@@ -1749,6 +1914,24 @@ declare module "ioredis" {
17491914
callback?: Callback<void>
17501915
): Result<void, Context>;
17511916

1917+
requeueMessage(
1918+
queueKey: string,
1919+
oldQueueKey: string,
1920+
parentQueueKey: string,
1921+
messageKey: string,
1922+
queueCurrentConcurrencyKey: string,
1923+
queueReserveConcurrencyKey: string,
1924+
envCurrentConcurrencyKey: string,
1925+
envReserveConcurrencyKey: string,
1926+
envQueueKey: string,
1927+
queueName: string,
1928+
oldQueueName: string,
1929+
messageId: string,
1930+
messageData: string,
1931+
messageScore: string,
1932+
callback?: Callback<string>
1933+
): Result<string, Context>;
1934+
17521935
acknowledgeMessage(
17531936
parentQueue: string,
17541937
messageKey: string,

apps/webapp/app/v3/marqs/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ export interface MessageQueueSubscriber {
100100
messageAcked(message: MessagePayload): Promise<void>;
101101
messageNacked(message: MessagePayload): Promise<void>;
102102
messageReplaced(message: MessagePayload): Promise<void>;
103+
messageRequeued(oldQueue: string, message: MessagePayload): Promise<void>;
103104
}
104105

105106
export interface VisibilityTimeoutStrategy {

apps/webapp/app/v3/services/completeAttempt.server.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ export class CompleteAttemptService extends BaseService {
468468
logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id });
469469

470470
// We have to replace a potential RESUME with EXECUTE to correctly retry the attempt
471-
return marqs?.replaceMessage(
471+
return marqs?.requeueMessage(
472472
run.id,
473473
{
474474
type: "EXECUTE",
@@ -615,7 +615,7 @@ export class CompleteAttemptService extends BaseService {
615615
});
616616

617617
if (environment.type === "DEVELOPMENT") {
618-
marqs.replaceMessage(
618+
marqs.requeueMessage(
619619
taskRunAttempt.taskRunId,
620620
{},
621621
executionRetry.timestamp,

apps/webapp/app/v3/services/createCheckpoint.server.ts

+4-10
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ export class CreateCheckpointService extends BaseService {
167167
});
168168

169169
if (checkpointEvent) {
170-
await marqs?.replaceMessage(
170+
await marqs.requeueMessage(
171171
attempt.taskRunId,
172172
{
173173
type: "RESUME_AFTER_DURATION",
@@ -297,15 +297,9 @@ export class CreateCheckpointService extends BaseService {
297297
}
298298

299299
//if there's a message in the queue, we make sure the checkpoint event is on it
300-
await marqs?.replaceMessage(
301-
attempt.taskRun.id,
302-
{
303-
checkpointEventId: checkpointEvent.id,
304-
},
305-
undefined,
306-
undefined,
307-
true
308-
);
300+
await marqs.replaceMessage(attempt.taskRun.id, {
301+
checkpointEventId: checkpointEvent.id,
302+
});
309303

310304
await ResumeBatchRunService.enqueue(
311305
batchRun.id,

apps/webapp/app/v3/services/resumeBatchRun.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ export class ResumeBatchRunService extends BaseService {
252252
hasCheckpointEvent: !!batchRun.checkpointEventId,
253253
});
254254

255-
await marqs?.replaceMessage(
255+
await marqs?.requeueMessage(
256256
dependentRun.id,
257257
{
258258
type: "RESUME",

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ export class ResumeTaskDependencyService extends BaseService {
8989
return;
9090
}
9191

92-
await marqs?.replaceMessage(
92+
await marqs?.requeueMessage(
9393
dependentRun.id,
9494
{
9595
type: "RESUME",

apps/webapp/app/v3/services/taskRunConcurrencyTracker.server.ts

+26
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,32 @@ class TaskRunConcurrencyTracker implements MessageQueueSubscriber {
121121
});
122122
}
123123

124+
async messageRequeued(oldQueue: string, message: MessagePayload): Promise<void> {
125+
logger.debug("TaskRunConcurrencyTracker.messageRequeued()", {
126+
data: message.data,
127+
messageId: message.messageId,
128+
oldQueue,
129+
});
130+
131+
const data = this.getMessageData(message);
132+
133+
if (!data) {
134+
logger.info(
135+
`TaskRunConcurrencyTracker.messageReplaced(): could not parse message data`,
136+
message
137+
);
138+
return;
139+
}
140+
141+
await this.executionFinished({
142+
projectId: data.projectId,
143+
taskId: data.taskIdentifier,
144+
runId: message.messageId,
145+
environmentId: data.environmentId,
146+
deployed: data.environmentType !== "DEVELOPMENT",
147+
});
148+
}
149+
124150
private getMessageData(message: MessagePayload) {
125151
const result = ConcurrentMessageData.safeParse(message.data);
126152
if (result.success) {

0 commit comments

Comments
 (0)