Skip to content

Commit a5b680d

Browse files
authored
fix: fix duplicates in Query.stream() with back pressure (#1806)
1 parent 0dbcb13 commit a5b680d

File tree

3 files changed

+183
-14
lines changed

3 files changed

+183
-14
lines changed

dev/src/reference.ts

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import * as firestore from '@google-cloud/firestore';
1818
import {Duplex, Readable, Transform} from 'stream';
1919
import * as deepEqual from 'fast-deep-equal';
20+
import {GoogleError} from 'google-gax';
2021

2122
import * as protos from '../protos/firestore_v1_proto_api';
2223

@@ -92,6 +93,8 @@ const comparisonOperators: {
9293
'array-contains-any': 'ARRAY_CONTAINS_ANY',
9394
};
9495

96+
const NOOP_MESSAGE = Symbol('a noop message');
97+
9598
/**
9699
* onSnapshot() callback that receives a QuerySnapshot.
97100
*
@@ -2268,6 +2271,11 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
22682271
return structuredQuery;
22692272
}
22702273

2274+
// This method exists solely to enable unit tests to mock it.
2275+
_isPermanentRpcError(err: GoogleError, methodName: string): boolean {
2276+
return isPermanentRpcError(err, methodName);
2277+
}
2278+
22712279
/**
22722280
* Internal streaming method that accepts an optional transaction ID.
22732281
*
@@ -2285,6 +2293,10 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
22852293
const stream = new Transform({
22862294
objectMode: true,
22872295
transform: (proto, enc, callback) => {
2296+
if (proto === NOOP_MESSAGE) {
2297+
callback(undefined);
2298+
return;
2299+
}
22882300
const readTime = Timestamp.fromProto(proto.readTime);
22892301
if (proto.document) {
22902302
const document = this.firestore.snapshot_(
@@ -2337,27 +2349,33 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
23372349

23382350
// If a non-transactional query failed, attempt to restart.
23392351
// Transactional queries are retried via the transaction runner.
2340-
if (!transactionId && !isPermanentRpcError(err, 'runQuery')) {
2352+
if (!transactionId && !this._isPermanentRpcError(err, 'runQuery')) {
23412353
logger(
23422354
'Query._stream',
23432355
tag,
23442356
'Query failed with retryable stream error:',
23452357
err
23462358
);
2347-
if (lastReceivedDocument) {
2348-
// Restart the query but use the last document we received as the
2349-
// query cursor. Note that we do not use backoff here. The call to
2350-
// `requestStream()` will backoff should the restart fail before
2351-
// delivering any results.
2352-
if (this._queryOptions.requireConsistency) {
2353-
request = this.startAfter(lastReceivedDocument).toProto(
2354-
lastReceivedDocument.readTime
2355-
);
2356-
} else {
2357-
request = this.startAfter(lastReceivedDocument).toProto();
2359+
// Enqueue a "no-op" write into the stream and resume the query
2360+
// once it is processed. This allows any enqueued results to be
2361+
// consumed before resuming the query so that the query resumption
2362+
// can start at the correct document.
2363+
stream.write(NOOP_MESSAGE, () => {
2364+
if (lastReceivedDocument) {
2365+
// Restart the query but use the last document we received as
2366+
// the query cursor. Note that we do not use backoff here. The
2367+
// call to `requestStream()` will backoff should the restart
2368+
// fail before delivering any results.
2369+
if (this._queryOptions.requireConsistency) {
2370+
request = this.startAfter(lastReceivedDocument).toProto(
2371+
lastReceivedDocument.readTime
2372+
);
2373+
} else {
2374+
request = this.startAfter(lastReceivedDocument).toProto();
2375+
}
23582376
}
2359-
}
2360-
streamActive.resolve(/* active= */ true);
2377+
streamActive.resolve(/* active= */ true);
2378+
});
23612379
} else {
23622380
logger(
23632381
'Query._stream',

dev/test/query.ts

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {DocumentSnapshot, DocumentSnapshotBuilder} from '../src/document';
3535
import {QualifiedResourcePath} from '../src/path';
3636
import {
3737
ApiOverride,
38+
collect,
3839
createInstance,
3940
document,
4041
InvalidApiUsage,
@@ -49,8 +50,10 @@ import {
4950
writeResult,
5051
} from './util/helpers';
5152

53+
import {GoogleError} from 'google-gax';
5254
import api = google.firestore.v1;
5355
import protobuf = google.protobuf;
56+
import {Deferred} from '../src/util';
5457

5558
const PROJECT_ID = 'test-project';
5659
const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`;
@@ -2507,3 +2510,120 @@ describe('collectionGroup queries', () => {
25072510
});
25082511
});
25092512
});
2513+
2514+
describe('query resumption', () => {
2515+
let firestore: Firestore;
2516+
2517+
beforeEach(() => {
2518+
setTimeoutHandler(setImmediate);
2519+
return createInstance().then(firestoreInstance => {
2520+
firestore = firestoreInstance;
2521+
});
2522+
});
2523+
2524+
afterEach(async () => {
2525+
await verifyInstance(firestore);
2526+
setTimeoutHandler(setTimeout);
2527+
});
2528+
2529+
// Prevent regression of
2530+
// https://github.com/googleapis/nodejs-firestore/issues/1790
2531+
it('results should not be double produced on retryable error with back pressure', async () => {
2532+
// Generate the IDs of the documents that will match the query.
2533+
const documentIds = Array.from(new Array(500), (_, index) => `doc${index}`);
2534+
2535+
// Finds the index in `documentIds` of the document referred to in the
2536+
// "startAt" of the given request.
2537+
function getStartAtDocumentIndex(
2538+
request: api.IRunQueryRequest
2539+
): number | null {
2540+
const startAt = request.structuredQuery?.startAt;
2541+
const startAtValue = startAt?.values?.[0]?.referenceValue;
2542+
const startAtBefore = startAt?.before;
2543+
if (typeof startAtValue !== 'string') {
2544+
return null;
2545+
}
2546+
const docId = startAtValue.split('/').pop()!;
2547+
const docIdIndex = documentIds.indexOf(docId);
2548+
if (docIdIndex < 0) {
2549+
return null;
2550+
}
2551+
return startAtBefore ? docIdIndex : docIdIndex + 1;
2552+
}
2553+
2554+
const RETRYABLE_ERROR_DOMAIN = 'RETRYABLE_ERROR_DOMAIN';
2555+
2556+
// A mock replacement for Query._isPermanentRpcError which (a) resolves
2557+
// a promise once invoked and (b) treats a specific error "domain" as
2558+
// non-retryable.
2559+
function mockIsPermanentRpcError(err: GoogleError): boolean {
2560+
mockIsPermanentRpcError.invoked.resolve(true);
2561+
return err?.domain !== RETRYABLE_ERROR_DOMAIN;
2562+
}
2563+
mockIsPermanentRpcError.invoked = new Deferred();
2564+
2565+
// Return the first half of the documents, followed by a retryable error.
2566+
function* getRequest1Responses(): Generator<api.IRunQueryResponse | Error> {
2567+
const runQueryResponses = documentIds
2568+
.slice(0, documentIds.length / 2)
2569+
.map(documentId => result(documentId));
2570+
for (const runQueryResponse of runQueryResponses) {
2571+
yield runQueryResponse;
2572+
}
2573+
const retryableError = new GoogleError('simulated retryable error');
2574+
retryableError.domain = RETRYABLE_ERROR_DOMAIN;
2575+
yield retryableError;
2576+
}
2577+
2578+
// Return the remaining documents.
2579+
function* getRequest2Responses(
2580+
request: api.IRunQueryRequest
2581+
): Generator<api.IRunQueryResponse> {
2582+
const startAtDocumentIndex = getStartAtDocumentIndex(request);
2583+
if (startAtDocumentIndex === null) {
2584+
throw new Error('request #2 should specify a valid startAt');
2585+
}
2586+
const runQueryResponses = documentIds
2587+
.slice(startAtDocumentIndex)
2588+
.map(documentId => result(documentId));
2589+
for (const runQueryResponse of runQueryResponses) {
2590+
yield runQueryResponse;
2591+
}
2592+
}
2593+
2594+
// Set up the mocked responses from Watch.
2595+
let requestNum = 0;
2596+
const overrides: ApiOverride = {
2597+
runQuery: request => {
2598+
requestNum++;
2599+
switch (requestNum) {
2600+
case 1:
2601+
return stream(...getRequest1Responses());
2602+
case 2:
2603+
return stream(...getRequest2Responses(request!));
2604+
default:
2605+
throw new Error(`should never get here (requestNum=${requestNum})`);
2606+
}
2607+
},
2608+
};
2609+
2610+
// Create an async iterator to get the result set but DO NOT iterate over
2611+
// it immediately. Instead, allow the responses to pile up and fill the
2612+
// buffers. Once isPermanentError() is invoked, indicating that the first
2613+
// request has failed and is about to be retried, collect the results from
2614+
// the async iterator into an array.
2615+
firestore = await createInstance(overrides);
2616+
const query = firestore.collection('collectionId');
2617+
query._isPermanentRpcError = mockIsPermanentRpcError;
2618+
const iterator = query
2619+
.stream()
2620+
[Symbol.asyncIterator]() as AsyncIterator<QueryDocumentSnapshot>;
2621+
await mockIsPermanentRpcError.invoked.promise;
2622+
const snapshots = await collect(iterator);
2623+
2624+
// Verify that the async iterator returned the correct documents and,
2625+
// especially, does not have duplicate results.
2626+
const actualDocumentIds = snapshots.map(snapshot => snapshot.id);
2627+
expect(actualDocumentIds).to.eql(documentIds);
2628+
});
2629+
});

dev/test/util/helpers.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,3 +413,34 @@ export async function bundleToElementArray(
413413
}
414414
return result;
415415
}
416+
417+
/**
418+
* Reads the elements of an AsyncIterator.
419+
*
420+
* Example:
421+
*
422+
* const query = firestore.collection('collectionId');
423+
* const iterator = query.stream()[Symbol.asyncIterator]()
424+
* as AsyncIterator<QueryDocumentSnapshot>;
425+
* return collect(iterator).then(snapshots => {
426+
* expect(snapshots).to.have.length(2);
427+
* });
428+
*
429+
* @param iterator the iterator whose elements over which to iterate.
430+
* @return a Promise that is fulfilled with the elements that were produced, or
431+
* is rejected with the cause of the first failed iteration.
432+
*/
433+
export async function collect<T, TReturn, TNext>(
434+
iterator: AsyncIterator<T, TReturn, TNext>
435+
): Promise<Array<T>> {
436+
const values: Array<T> = [];
437+
// eslint-disable-next-line no-constant-condition
438+
while (true) {
439+
const {done, value} = await iterator.next();
440+
if (done) {
441+
break;
442+
}
443+
values.push(value);
444+
}
445+
return values;
446+
}

0 commit comments

Comments
 (0)