Skip to content

Commit 92ea651

Browse files
author
Brian Chen
authored
fix: recursive delete: backporting changes from Java (#1514)
1 parent 245c3a9 commit 92ea651

File tree

3 files changed

+138
-39
lines changed

3 files changed

+138
-39
lines changed

dev/src/index.ts

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@ const serviceConfig = interfaces['google.firestore.v1.Firestore'];
7676

7777
import api = google.firestore.v1;
7878
import {CollectionGroup} from './collection-group';
79-
import {RecursiveDelete} from './recursive-delete';
79+
import {
80+
RECURSIVE_DELETE_MAX_PENDING_OPS,
81+
RECURSIVE_DELETE_MIN_PENDING_OPS,
82+
RecursiveDelete,
83+
} from './recursive-delete';
8084

8185
export {
8286
CollectionReference,
@@ -1250,9 +1254,38 @@ export class Firestore implements firestore.Firestore {
12501254
| firestore.CollectionReference<unknown>
12511255
| firestore.DocumentReference<unknown>,
12521256
bulkWriter?: BulkWriter
1257+
): Promise<void> {
1258+
return this._recursiveDelete(
1259+
ref,
1260+
RECURSIVE_DELETE_MAX_PENDING_OPS,
1261+
RECURSIVE_DELETE_MIN_PENDING_OPS,
1262+
bulkWriter
1263+
);
1264+
}
1265+
1266+
/**
1267+
* This overload is not private in order to test the query resumption with
1268+
* startAfter() once the RecursiveDelete instance has MAX_PENDING_OPS pending.
1269+
*
1270+
* @private
1271+
*/
1272+
// Visible for testing
1273+
_recursiveDelete(
1274+
ref:
1275+
| firestore.CollectionReference<unknown>
1276+
| firestore.DocumentReference<unknown>,
1277+
maxPendingOps: number,
1278+
minPendingOps: number,
1279+
bulkWriter?: BulkWriter
12531280
): Promise<void> {
12541281
const writer = bulkWriter ?? this.getBulkWriter();
1255-
const deleter = new RecursiveDelete(this, writer, ref);
1282+
const deleter = new RecursiveDelete(
1283+
this,
1284+
writer,
1285+
ref,
1286+
maxPendingOps,
1287+
minPendingOps
1288+
);
12561289
return deleter.run();
12571290
}
12581291

dev/src/recursive-delete.ts

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__';
4848
* from streaming documents faster than Firestore can delete.
4949
*/
5050
// Visible for testing.
51-
export const MAX_PENDING_OPS = 5000;
51+
export const RECURSIVE_DELETE_MAX_PENDING_OPS = 5000;
5252

5353
/**
5454
* The number of pending BulkWriter operations at which RecursiveDelete
@@ -57,7 +57,7 @@ export const MAX_PENDING_OPS = 5000;
5757
* throughput. This helps prevent BulkWriter from idling while Firestore
5858
* fetches the next query.
5959
*/
60-
const MIN_PENDING_OPS = 1000;
60+
export const RECURSIVE_DELETE_MIN_PENDING_OPS = 1000;
6161

6262
/**
6363
* Class used to store state required for running a recursive delete operation.
@@ -84,6 +84,25 @@ export class RecursiveDelete {
8484
*/
8585
private documentsPending = true;
8686

87+
/**
88+
* Whether run() has been called.
89+
* @private
90+
*/
91+
private started = false;
92+
93+
/**
94+
* Query limit to use when fetching all descendants.
95+
* @private
96+
*/
97+
private readonly maxPendingOps: number;
98+
99+
/**
100+
* The number of pending BulkWriter operations at which RecursiveDelete
101+
* starts the next limit query to fetch descendants.
102+
* @private
103+
*/
104+
private readonly minPendingOps: number;
105+
87106
/**
88107
* A deferred promise that resolves when the recursive delete operation
89108
* is completed.
@@ -119,25 +138,30 @@ export class RecursiveDelete {
119138
* @param firestore The Firestore instance to use.
120139
* @param writer The BulkWriter instance to use for delete operations.
121140
* @param ref The document or collection reference to recursively delete.
141+
* @param maxLimit The query limit to use when fetching descendants
142+
* @param minLimit The number of pending BulkWriter operations at which
143+
* RecursiveDelete starts the next limit query to fetch descendants.
122144
*/
123145
constructor(
124146
private readonly firestore: Firestore,
125147
private readonly writer: BulkWriter,
126148
private readonly ref:
127149
| firestore.CollectionReference<unknown>
128-
| firestore.DocumentReference<unknown>
129-
) {}
150+
| firestore.DocumentReference<unknown>,
151+
private readonly maxLimit: number,
152+
private readonly minLimit: number
153+
) {
154+
this.maxPendingOps = maxLimit;
155+
this.minPendingOps = minLimit;
156+
}
130157

131158
/**
132159
* Recursively deletes the reference provided in the class constructor.
133160
* Returns a promise that resolves when all descendants have been deleted, or
134161
* if an error occurs.
135162
*/
136163
run(): Promise<void> {
137-
assert(
138-
this.documentsPending,
139-
'The recursive delete operation has already been completed.'
140-
);
164+
assert(!this.started, 'RecursiveDelete.run() should only be called once.');
141165

142166
// Capture the error stack to preserve stack tracing across async calls.
143167
this.errorStack = Error().stack!;
@@ -152,12 +176,10 @@ export class RecursiveDelete {
152176
* @private
153177
*/
154178
private setupStream(): void {
155-
const limit = MAX_PENDING_OPS;
156179
const stream = this.getAllDescendants(
157180
this.ref instanceof CollectionReference
158181
? (this.ref as CollectionReference<unknown>)
159-
: (this.ref as DocumentReference<unknown>),
160-
limit
182+
: (this.ref as DocumentReference<unknown>)
161183
);
162184
this.streamInProgress = true;
163185
let streamedDocsCount = 0;
@@ -177,7 +199,7 @@ export class RecursiveDelete {
177199
this.streamInProgress = false;
178200
// If there are fewer than the number of documents specified in the
179201
// limit() field, we know that the query is complete.
180-
if (streamedDocsCount < limit) {
202+
if (streamedDocsCount < this.minPendingOps) {
181203
this.onQueryEnd();
182204
} else if (this.pendingOpsCount === 0) {
183205
this.setupStream();
@@ -188,13 +210,11 @@ export class RecursiveDelete {
188210
/**
189211
* Retrieves all descendant documents nested under the provided reference.
190212
* @param ref The reference to fetch all descendants for.
191-
* @param limit The number of descendants to fetch in the query.
192213
* @private
193214
* @return {Stream<QueryDocumentSnapshot>} Stream of descendant documents.
194215
*/
195216
private getAllDescendants(
196-
ref: CollectionReference<unknown> | DocumentReference<unknown>,
197-
limit: number
217+
ref: CollectionReference<unknown> | DocumentReference<unknown>
198218
): NodeJS.ReadableStream {
199219
// The parent is the closest ancestor document to the location we're
200220
// deleting. If we are deleting a document, the parent is the path of that
@@ -220,7 +240,7 @@ export class RecursiveDelete {
220240
);
221241

222242
// Query for names only to fetch empty snapshots.
223-
query = query.select(FieldPath.documentId()).limit(limit);
243+
query = query.select(FieldPath.documentId()).limit(this.maxPendingOps);
224244

225245
if (ref instanceof CollectionReference) {
226246
// To find all descendants of a collection reference, we need to use a
@@ -300,7 +320,7 @@ export class RecursiveDelete {
300320
if (
301321
this.documentsPending &&
302322
!this.streamInProgress &&
303-
this.pendingOpsCount < MIN_PENDING_OPS
323+
this.pendingOpsCount < this.minPendingOps
304324
) {
305325
this.setupStream();
306326
}

dev/test/recursive-delete.ts

Lines changed: 66 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,11 @@ import {
5050
import {MAX_REQUEST_RETRIES} from '../src';
5151

5252
import api = google.firestore.v1;
53-
import {MAX_PENDING_OPS, REFERENCE_NAME_MIN_ID} from '../src/recursive-delete';
53+
import {
54+
RECURSIVE_DELETE_MAX_PENDING_OPS,
55+
REFERENCE_NAME_MIN_ID,
56+
} from '../src/recursive-delete';
57+
import {Deferred} from '../src/util';
5458

5559
const PROJECT_ID = 'test-project';
5660
const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`;
@@ -140,7 +144,7 @@ describe('recursiveDelete() method:', () => {
140144
'LESS_THAN',
141145
endAt('root')
142146
),
143-
limit(MAX_PENDING_OPS)
147+
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
144148
);
145149
return stream();
146150
},
@@ -165,7 +169,7 @@ describe('recursiveDelete() method:', () => {
165169
'LESS_THAN',
166170
endAt('root/doc/nestedCol')
167171
),
168-
limit(MAX_PENDING_OPS)
172+
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
169173
);
170174
return stream();
171175
},
@@ -184,7 +188,7 @@ describe('recursiveDelete() method:', () => {
184188
'root/doc',
185189
select('__name__'),
186190
allDescendants(/* kindless= */ true),
187-
limit(MAX_PENDING_OPS)
191+
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
188192
);
189193
return stream();
190194
},
@@ -222,7 +226,7 @@ describe('recursiveDelete() method:', () => {
222226
'LESS_THAN',
223227
endAt('root')
224228
),
225-
limit(MAX_PENDING_OPS)
229+
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
226230
);
227231
return stream();
228232
}
@@ -235,8 +239,32 @@ describe('recursiveDelete() method:', () => {
235239
});
236240

237241
it('creates a second query with the correct startAfter', async () => {
238-
const firstStream = Array.from(Array(MAX_PENDING_OPS).keys()).map(
239-
(_, i) => result('doc' + i)
242+
// This test checks that the second query is created with the correct
243+
// startAfter() once the RecursiveDelete instance is below the
244+
// MIN_PENDING_OPS threshold to send the next batch. Use lower limits
245+
// than the actual RecursiveDelete class in order to make this test run fast.
246+
const maxPendingOps = 100;
247+
const minPendingOps = 11;
248+
const maxBatchSize = 10;
249+
const cutoff = maxPendingOps - minPendingOps;
250+
let numDeletesBuffered = 0;
251+
252+
// This deferred promise is used to delay the BatchWriteResponses from
253+
// returning in order to create the situation where the number of pending
254+
// operations is less than `minPendingOps`.
255+
const bufferDeferred = new Deferred<void>();
256+
257+
// This deferred completes when the second query is run.
258+
const secondQueryDeferred = new Deferred<void>();
259+
260+
const nLengthArray = (n: number): number[] => Array.from(Array(n).keys());
261+
262+
const firstStream = nLengthArray(maxPendingOps).map((_, i) =>
263+
result('doc' + i)
264+
);
265+
266+
const batchWriteResponse = mergeResponses(
267+
nLengthArray(maxBatchSize).map(() => successResponse(1))
240268
);
241269

242270
// Use an array to store that the queryEquals() method succeeded, since
@@ -257,7 +285,7 @@ describe('recursiveDelete() method:', () => {
257285
'LESS_THAN',
258286
endAt('root')
259287
),
260-
limit(MAX_PENDING_OPS)
288+
limit(maxPendingOps)
261289
);
262290
called.push(1);
263291
return stream(...firstStream);
@@ -279,34 +307,52 @@ describe('recursiveDelete() method:', () => {
279307
referenceValue:
280308
`projects/${PROJECT_ID}/databases/(default)/` +
281309
'documents/collectionId/doc' +
282-
(MAX_PENDING_OPS - 1),
310+
(maxPendingOps - 1),
283311
}),
284-
limit(MAX_PENDING_OPS)
312+
limit(maxPendingOps)
285313
);
286314
called.push(2);
315+
secondQueryDeferred.resolve();
287316
return stream();
288317
} else {
289318
called.push(3);
290319
return stream();
291320
}
292321
},
293322
batchWrite: () => {
294-
const responses = mergeResponses(
295-
Array.from(Array(500).keys()).map(() => successResponse(1))
296-
);
297-
return response({
298-
writeResults: responses.writeResults,
299-
status: responses.status,
323+
const returnedResponse = response({
324+
writeResults: batchWriteResponse.writeResults,
325+
status: batchWriteResponse.status,
300326
});
327+
if (numDeletesBuffered < cutoff) {
328+
numDeletesBuffered += batchWriteResponse.writeResults!.length;
329+
330+
// By waiting for `bufferFuture` to complete, we can guarantee that
331+
// the writes complete after all documents are streamed. Without
332+
// this future, the test can race and complete the writes before
333+
// the stream is finished, which is a different scenario this test
334+
// is not for.
335+
return bufferDeferred.promise.then(() => returnedResponse);
336+
} else {
337+
// Once there are `cutoff` pending deletes, completing the future
338+
// allows enough responses to be returned such that the number of
339+
// pending deletes should be less than `minPendingOps`. This allows
340+
// us to test that the second query is made.
341+
bufferDeferred.resolve();
342+
return secondQueryDeferred.promise.then(() => returnedResponse);
343+
}
301344
},
302345
};
303346
const firestore = await createInstance(overrides);
304347

305-
// Use a custom batch size with BulkWriter to simplify the dummy
306-
// batchWrite() response logic.
307348
const bulkWriter = firestore.bulkWriter();
308-
bulkWriter._maxBatchSize = 500;
309-
await firestore.recursiveDelete(firestore.collection('root'), bulkWriter);
349+
bulkWriter._maxBatchSize = maxBatchSize;
350+
await firestore._recursiveDelete(
351+
firestore.collection('root'),
352+
maxPendingOps,
353+
minPendingOps,
354+
bulkWriter
355+
);
310356
expect(called).to.deep.equal([1, 2]);
311357
});
312358
});

0 commit comments

Comments
 (0)