Skip to content

Commit 26d480b

Browse files
author
Brian Chen
authored
fix: lower batch size on BulkWriter retry (#1549)
1 parent 8d97847 commit 26d480b

File tree

3 files changed

+80
-8
lines changed

3 files changed

+80
-8
lines changed

dev/src/bulk-writer.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ import api = google.firestore.v1;
5656
*/
5757
const MAX_BATCH_SIZE = 20;
5858

59+
/*!
60+
* The maximum number of writes can be can in a single batch that is being retried.
61+
*/
62+
export const RETRY_MAX_BATCH_SIZE = 10;
63+
5964
/*!
6065
* The starting maximum number of operations per second as allowed by the
6166
* 500/50/5 rule.
@@ -213,6 +218,13 @@ class BulkCommitBatch extends WriteBatch {
213218
// been resolved.
214219
readonly pendingOps: Array<BulkWriterOperation> = [];
215220

221+
readonly maxBatchSize: number;
222+
223+
constructor(firestore: Firestore, maxBatchSize: number) {
224+
super(firestore);
225+
this.maxBatchSize = maxBatchSize;
226+
}
227+
216228
has(documentRef: firestore.DocumentReference<unknown>): boolean {
217229
return this.docPaths.has(documentRef.path);
218230
}
@@ -333,14 +345,17 @@ export class BulkWriter {
333345
* Visible for testing.
334346
* @private
335347
*/
336-
_maxBatchSize = MAX_BATCH_SIZE;
348+
private _maxBatchSize = MAX_BATCH_SIZE;
337349

338350
/**
339351
* The batch that is currently used to schedule operations. Once this batch
340352
* reaches maximum capacity, a new batch is created.
341353
* @private
342354
*/
343-
private _bulkCommitBatch = new BulkCommitBatch(this.firestore);
355+
private _bulkCommitBatch = new BulkCommitBatch(
356+
this.firestore,
357+
this._maxBatchSize
358+
);
344359

345360
/**
346361
* A pointer to the tail of all active BulkWriter operations. This pointer
@@ -384,6 +399,16 @@ export class BulkWriter {
384399
return this._bufferedOperations.length;
385400
}
386401

402+
// Visible for testing.
403+
_setMaxBatchSize(size: number): void {
404+
assert(
405+
this._bulkCommitBatch.pendingOps.length === 0,
406+
'BulkCommitBatch should be empty'
407+
);
408+
this._maxBatchSize = size;
409+
this._bulkCommitBatch = new BulkCommitBatch(this.firestore, size);
410+
}
411+
387412
/**
388413
* The maximum number of pending operations that can be enqueued onto this
389414
* BulkWriter instance. Once the this number of writes have been enqueued,
@@ -840,7 +865,6 @@ export class BulkWriter {
840865
if (this._bulkCommitBatch._opCount === 0) return;
841866

842867
const pendingBatch = this._bulkCommitBatch;
843-
this._bulkCommitBatch = new BulkCommitBatch(this.firestore);
844868

845869
// Use the write with the longest backoff duration when determining backoff.
846870
const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) =>
@@ -849,6 +873,13 @@ export class BulkWriter {
849873
const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration);
850874
const delayedExecution = new Deferred<void>();
851875

876+
// A backoff duration greater than 0 implies that this batch is a retry.
877+
// Retried writes are sent with a batch size of 10 in order to guarantee
878+
// that the batch is under the 10MiB limit.
879+
const maxBatchSize =
880+
highestBackoffDuration > 0 ? RETRY_MAX_BATCH_SIZE : this._maxBatchSize;
881+
this._bulkCommitBatch = new BulkCommitBatch(this.firestore, maxBatchSize);
882+
852883
if (backoffMsWithJitter > 0) {
853884
delayExecution(() => delayedExecution.resolve(), backoffMsWithJitter);
854885
} else {
@@ -988,7 +1019,7 @@ export class BulkWriter {
9881019
enqueueOnBatchCallback(this._bulkCommitBatch);
9891020
this._bulkCommitBatch.processLastOperation(op);
9901021

991-
if (this._bulkCommitBatch._opCount === this._maxBatchSize) {
1022+
if (this._bulkCommitBatch._opCount === this._bulkCommitBatch.maxBatchSize) {
9921023
this._scheduleCurrentBatch();
9931024
} else if (op.flushed) {
9941025
// If flush() was called before this operation was enqueued into a batch,

dev/test/bulk-writer.ts

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
DEFAULT_INITIAL_OPS_PER_SECOND_LIMIT,
3939
DEFAULT_JITTER_FACTOR,
4040
DEFAULT_MAXIMUM_OPS_PER_SECOND_LIMIT,
41+
RETRY_MAX_BATCH_SIZE,
4142
} from '../src/bulk-writer';
4243
import {
4344
ApiOverride,
@@ -576,7 +577,7 @@ describe('BulkWriter', () => {
576577
},
577578
]);
578579
bulkWriter._setMaxPendingOpCount(6);
579-
bulkWriter._maxBatchSize = 3;
580+
bulkWriter._setMaxBatchSize(3);
580581
bulkWriter
581582
.set(firestore.doc('collectionId/doc1'), {foo: 'bar'})
582583
.then(incrementOpCount);
@@ -822,6 +823,46 @@ describe('BulkWriter', () => {
822823
expect(timeoutHandlerCounter).to.equal(3);
823824
});
824825

826+
it('retries with smaller batch size', async () => {
827+
const nLengthArray = (n: number): number[] => Array.from(Array(n).keys());
828+
829+
const bulkWriter = await instantiateInstance([
830+
{
831+
request: createRequest(
832+
nLengthArray(15).map((_, i) => setOp('doc' + i, 'bar'))
833+
),
834+
response: mergeResponses(
835+
nLengthArray(15).map(() => failedResponse(Status.ABORTED))
836+
),
837+
},
838+
{
839+
request: createRequest(
840+
nLengthArray(RETRY_MAX_BATCH_SIZE).map((_, i) =>
841+
setOp('doc' + i, 'bar')
842+
)
843+
),
844+
response: mergeResponses(
845+
nLengthArray(RETRY_MAX_BATCH_SIZE).map(() => successResponse(1))
846+
),
847+
},
848+
{
849+
request: createRequest(
850+
nLengthArray(15 - RETRY_MAX_BATCH_SIZE).map((_, i) =>
851+
setOp('doc' + i + RETRY_MAX_BATCH_SIZE, 'bar')
852+
)
853+
),
854+
response: mergeResponses(
855+
nLengthArray(15 - RETRY_MAX_BATCH_SIZE).map(() => successResponse(1))
856+
),
857+
},
858+
]);
859+
for (let i = 0; i < 15; i++) {
860+
bulkWriter.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'});
861+
}
862+
863+
await bulkWriter.close();
864+
});
865+
825866
it('retries maintain correct write resolution ordering', async () => {
826867
const bulkWriter = await instantiateInstance([
827868
{
@@ -910,7 +951,7 @@ describe('BulkWriter', () => {
910951
},
911952
]);
912953

913-
bulkWriter._maxBatchSize = 2;
954+
bulkWriter._setMaxBatchSize(2);
914955
for (let i = 0; i < 6; i++) {
915956
bulkWriter
916957
.set(firestore.doc('collectionId/doc' + i), {foo: 'bar'})
@@ -942,7 +983,7 @@ describe('BulkWriter', () => {
942983
},
943984
]);
944985

945-
bulkWriter._maxBatchSize = 3;
986+
bulkWriter._setMaxBatchSize(3);
946987
const promise1 = bulkWriter
947988
.set(firestore.doc('collectionId/doc1'), {foo: 'bar'})
948989
.then(incrementOpCount);

dev/test/recursive-delete.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ describe('recursiveDelete() method:', () => {
346346
const firestore = await createInstance(overrides);
347347

348348
const bulkWriter = firestore.bulkWriter();
349-
bulkWriter._maxBatchSize = maxBatchSize;
349+
bulkWriter._setMaxBatchSize(maxBatchSize);
350350
await firestore._recursiveDelete(
351351
firestore.collection('root'),
352352
maxPendingOps,

0 commit comments

Comments
 (0)