Skip to content

Commit 39aad76

Browse files
authored
improvement(container-runtime): Pass all resubmit-related info in a single parameter when flushing a resubmitted batch (#24516)
Prerequisite refactor needed for #24510 This lets all the flush code know whether this flush is for resubmit or not by checking for the existence of that `BatchResubmitInfo` param.
1 parent 4449d0a commit 39aad76

File tree

4 files changed

+45
-41
lines changed

4 files changed

+45
-41
lines changed

packages/runtime/container-runtime/src/containerRuntime.ts

+8-10
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ import {
210210
Outbox,
211211
RemoteMessageProcessor,
212212
type OutboundBatch,
213+
type BatchResubmitInfo,
213214
} from "./opLifecycle/index.js";
214215
import { pkgVersion } from "./packageVersion.js";
215216
import {
@@ -3190,23 +3191,20 @@ export class ContainerRuntime
31903191
}
31913192

31923193
/**
3193-
* Flush the pending ops manually.
3194-
* This method is expected to be called at the end of a batch.
3194+
* Flush the current batch of ops to the ordering service for sequencing
3195+
* This method is not expected to be called in the middle of a batch.
31953196
* @remarks - If it throws (e.g. if the batch is too large to send), the container will be closed.
31963197
*
3197-
* @param resubmittingBatchId - If defined, indicates this is a resubmission of a batch
3198-
* with the given Batch ID, which must be preserved
3199-
* @param resubmittingStagedBatch - If defined, indicates this is a resubmission of a batch that is staged,
3200-
* meaning it should not be sent to the ordering service yet.
3198+
* @param resubmitInfo - If defined, indicates this is a resubmission of a batch with the given Batch info needed for resubmit.
32013199
*/
3202-
private flush(resubmittingBatchId?: BatchId, resubmittingStagedBatch?: boolean): void {
3200+
private flush(resubmitInfo?: BatchResubmitInfo): void {
32033201
try {
32043202
assert(
32053203
!this.batchRunner.running,
32063204
0x24c /* "Cannot call `flush()` while manually accumulating a batch (e.g. under orderSequentially) */,
32073205
);
32083206

3209-
this.outbox.flush(resubmittingBatchId, resubmittingStagedBatch);
3207+
this.outbox.flush(resubmitInfo);
32103208
assert(this.outbox.isEmpty, 0x3cf /* reentrancy */);
32113209
} catch (error) {
32123210
const error2 = normalizeError(error, {
@@ -3320,7 +3318,7 @@ export class ContainerRuntime
33203318
this.outbox.flush();
33213319
const exitStagingMode = (discardOrCommit: () => void) => (): void => {
33223320
// Final flush of any last staged changes
3323-
this.outbox.flush(undefined, true /* staged */);
3321+
this.outbox.flush();
33243322

33253323
this.stageControls = undefined;
33263324

@@ -4558,7 +4556,7 @@ export class ContainerRuntime
45584556

45594557
// Only include Batch ID if "Offline Load" feature is enabled
45604558
// It's only needed to identify batches across container forks arising from misuse of offline load.
4561-
this.flush(this.offlineEnabled ? batchId : undefined, staged);
4559+
this.flush({ batchId: this.offlineEnabled ? batchId : undefined, staged });
45624560
}
45634561

45644562
private reSubmit(message: PendingMessageResubmitData): void {

packages/runtime/container-runtime/src/opLifecycle/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export {
2727
ensureContentsDeserialized,
2828
} from "./opSerialization.js";
2929
export {
30+
BatchResubmitInfo,
3031
estimateSocketSize,
3132
localBatchToOutboundBatch,
3233
Outbox,

packages/runtime/container-runtime/src/opLifecycle/outbox.ts

+32-27
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,23 @@ export interface IOutboxParameters {
7070
readonly opReentrancy: () => boolean;
7171
}
7272

73+
/**
74+
* Info needed to correctly resubmit a batch
75+
*/
76+
export interface BatchResubmitInfo {
77+
/**
78+
* If defined, indicates the Batch ID of the batch being resubmitted.
79+
* This must be preserved on the new batch about to be submitted so they can be correlated/deduped in case both are sent.
80+
*/
81+
batchId?: string;
82+
/**
83+
* Indicates whether or not this batch is "staged", meaning it should not be sent to the ordering service yet
84+
* This is important on resubmit because we may be in Staging Mode for new changes,
85+
* but resubmitting a non-staged change from before entering Staging Mode
86+
*/
87+
staged: boolean;
88+
}
89+
7390
/**
7491
* Temporarily increase the stack limit while executing the provided action.
7592
* If a negative value is provided for `length`, no stack frames will be collected.
@@ -335,37 +352,33 @@ export class Outbox {
335352
* This method is expected to be called at the end of a batch.
336353
*
337354
* @throws If called from a reentrant context, or if the batch being flushed is too large.
338-
* @param resubmittingBatchId - If defined, indicates this is a resubmission of a batch
339-
* with the given Batch ID, which must be preserved
340-
* @param resubmittingStagedBatch - If defined, indicates this is a resubmission of a batch that is staged,
341-
* meaning it should not be sent to the ordering service yet.
355+
* @param resubmitInfo - Key information when flushing a resubmitted batch. Undefined means this is not resubmit.
342356
*/
343-
public flush(resubmittingBatchId?: BatchId, resubmittingStagedBatch?: boolean): void {
357+
public flush(resubmitInfo?: BatchResubmitInfo): void {
344358
assert(
345359
!this.isContextReentrant(),
346360
0xb7b /* Flushing must not happen while incoming changes are being processed */,
347361
);
348-
349-
this.flushAll(resubmittingBatchId, resubmittingStagedBatch);
362+
this.flushAll(resubmitInfo);
350363
}
351364

352-
private flushAll(resubmittingBatchId?: BatchId, resubmittingStagedBatch?: boolean): void {
365+
private flushAll(resubmitInfo?: BatchResubmitInfo): void {
353366
const allBatchesEmpty =
354367
this.idAllocationBatch.empty && this.blobAttachBatch.empty && this.mainBatch.empty;
355368
if (allBatchesEmpty) {
356-
// If we're resubmitting and all batches are empty, we need to flush an empty batch.
357-
// Note that we currently resubmit one batch at a time, so on resubmit, 2 of the 3 batches will *always* be empty.
369+
// If we're resubmitting with a batchId and all batches are empty, we need to flush an empty batch.
370+
// Note that we currently resubmit one batch at a time, so on resubmit, 1 of the 2 batches will *always* be empty.
358371
// It's theoretically possible that we don't *need* to resubmit this empty batch, and in those cases, it'll safely be ignored
359372
// by the rest of the system, including remote clients.
360373
// In some cases we *must* resubmit the empty batch (to match up with a non-empty version tracked locally by a container fork), so we do it always.
361-
if (resubmittingBatchId) {
362-
this.flushEmptyBatch(resubmittingBatchId, resubmittingStagedBatch === true);
374+
if (resubmitInfo?.batchId !== undefined) {
375+
this.flushEmptyBatch(resubmitInfo.batchId, resubmitInfo.staged);
363376
}
364377
return;
365378
}
366379

367380
// Don't use resubmittingBatchId for idAllocationBatch.
368-
// ID Allocation messages are not directly resubmitted so we don't want to reuse the batch ID.
381+
// ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo
369382
this.flushInternal({
370383
batchManager: this.idAllocationBatch,
371384
// Note: For now, we will never stage ID Allocation messages.
@@ -374,13 +387,11 @@ export class Outbox {
374387
this.flushInternal({
375388
batchManager: this.blobAttachBatch,
376389
disableGroupedBatching: true,
377-
resubmittingBatchId,
378-
resubmittingStagedBatch,
390+
resubmitInfo,
379391
});
380392
this.flushInternal({
381393
batchManager: this.mainBatch,
382-
resubmittingBatchId,
383-
resubmittingStagedBatch,
394+
resubmitInfo,
384395
});
385396
}
386397

@@ -416,25 +427,19 @@ export class Outbox {
416427
private flushInternal(params: {
417428
batchManager: BatchManager;
418429
disableGroupedBatching?: boolean;
419-
resubmittingBatchId?: BatchId; // undefined if not resubmitting
420-
resubmittingStagedBatch?: boolean; // undefined if not resubmitting
430+
resubmitInfo?: BatchResubmitInfo; // undefined if not resubmitting
421431
}): void {
422-
const {
423-
batchManager,
424-
disableGroupedBatching = false,
425-
resubmittingBatchId,
426-
resubmittingStagedBatch,
427-
} = params;
432+
const { batchManager, disableGroupedBatching = false, resubmitInfo } = params;
428433
if (batchManager.empty) {
429434
return;
430435
}
431436

432-
const rawBatch = batchManager.popBatch(resubmittingBatchId);
437+
const rawBatch = batchManager.popBatch(resubmitInfo?.batchId);
433438

434439
// When resubmitting, we respect the staged state of the original batch.
435440
// In this case rawBatch.staged will match the state of inStagingMode when
436441
// the resubmit occurred, which is not relevant.
437-
const staged = resubmittingStagedBatch ?? rawBatch.staged === true;
442+
const staged = resubmitInfo?.staged ?? rawBatch.staged === true;
438443

439444
const groupingEnabled =
440445
!disableGroupedBatching && this.params.groupingManager.groupedBatchingEnabled();

packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ describe("Outbox", () => {
405405
assert.equal(state.pendingOpContents.length, 0);
406406
const batchId = "batchId";
407407
// ...But if batchId is provided, it's resubmit, and we need to send an empty batch with the batchId
408-
outbox.flush(batchId);
408+
outbox.flush({ batchId, staged: false });
409409
assert.equal(state.opsSubmitted, 1);
410410
assert.equal(state.batchesSubmitted.length, 1);
411411
assert.equal(
@@ -430,17 +430,17 @@ describe("Outbox", () => {
430430
outbox.submitIdAllocation(createMessage(ContainerMessageType.IdAllocation, "0")); // Separate batch, batch ID not used
431431
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "1"));
432432
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "2"));
433-
outbox.flush("batchId-A");
433+
outbox.flush({ batchId: "batchId-A", staged: false });
434434

435435
// Flush 2 - resubmit single-message batch
436436
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "3"));
437-
outbox.flush("batchId-B");
437+
outbox.flush({ batchId: "batchId-B", staged: false });
438438

439439
// Flush 3 - resubmit blob attach batch
440440
outbox.submitBlobAttach(createMessage(ContainerMessageType.BlobAttach, "4"));
441441
outbox.submitBlobAttach(createMessage(ContainerMessageType.BlobAttach, "5"));
442442
currentSeqNumbers.referenceSequenceNumber = 0;
443-
outbox.flush("batchId-C");
443+
outbox.flush({ batchId: "batchId-C", staged: false });
444444

445445
// Flush 4 - no batch ID given
446446
outbox.submit(createMessage(ContainerMessageType.FluidDataStoreOp, "6"));

0 commit comments

Comments
 (0)