Skip to content

Commit b2798d9

Browse files
author
Kwabena Ampofo
authored
fix(NODE-4103): respect BSON options when creating change streams (#3247)
1 parent 1261432 commit b2798d9

File tree

2 files changed

+123
-4
lines changed

2 files changed

+123
-4
lines changed

src/change_stream.ts

+1-3
Original file line numberDiff line numberDiff line change
@@ -647,8 +647,6 @@ export class ChangeStream<
647647
}
648648
const pipeline = [{ $changeStream: changeStreamStageOptions }, ...this.pipeline];
649649

650-
const cursorOptions: ChangeStreamCursorOptions = filterOptions(options, CURSOR_OPTIONS);
651-
652650
const client: MongoClient | null =
653651
this.type === CHANGE_DOMAIN_TYPES.CLUSTER
654652
? (this.parent as MongoClient)
@@ -669,7 +667,7 @@ export class ChangeStream<
669667
client,
670668
this.namespace,
671669
pipeline,
672-
cursorOptions
670+
options
673671
);
674672

675673
for (const event of CHANGE_STREAM_EVENTS) {

test/integration/change-streams/change_stream.test.ts

+122-1
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import { strict as assert } from 'assert';
22
import { expect } from 'chai';
33
import * as crypto from 'crypto';
4+
import { once } from 'events';
45
import * as sinon from 'sinon';
56
import { PassThrough, Transform } from 'stream';
67

78
import {
89
ChangeStream,
910
ChangeStreamOptions,
1011
Collection,
12+
Db,
1113
Long,
1214
MongoClient,
1315
MongoNetworkError,
@@ -22,7 +24,7 @@ import {
2224
TestBuilder,
2325
UnifiedTestSuiteBuilder
2426
} from '../../tools/utils';
25-
import { delay, setupDatabase, withClient, withCursor } from '../shared';
27+
import { delay, filterForCommands, setupDatabase, withClient, withCursor } from '../shared';
2628

2729
function withChangeStream(
2830
callback: (collection: Collection, changeStream: ChangeStream, done: Mocha.Done) => void
@@ -1990,4 +1992,123 @@ describe('Change Streams', function () {
19901992
.toJSON()
19911993
)
19921994
.run();
1995+
1996+
describe('BSON Options', function () {
1997+
let client: MongoClient;
1998+
let db: Db;
1999+
let collection: Collection;
2000+
let cs: ChangeStream;
2001+
beforeEach(async function () {
2002+
client = await this.configuration.newClient({ monitorCommands: true }).connect();
2003+
db = client.db('db');
2004+
collection = await db.createCollection('collection');
2005+
});
2006+
afterEach(async function () {
2007+
await db.dropCollection('collection');
2008+
await cs.close();
2009+
await client.close();
2010+
client = undefined;
2011+
db = undefined;
2012+
collection = undefined;
2013+
});
2014+
2015+
context('promoteLongs', () => {
2016+
context('when set to true', () => {
2017+
it('does not convert Longs to numbers', {
2018+
metadata: { requires: { topology: '!single' } },
2019+
test: async function () {
2020+
cs = collection.watch([], { promoteLongs: true });
2021+
2022+
const willBeChange = once(cs, 'change').then(args => args[0]);
2023+
await once(cs.cursor, 'init');
2024+
2025+
const result = await collection.insertOne({ a: Long.fromNumber(0) });
2026+
expect(result).to.exist;
2027+
2028+
const change = await willBeChange;
2029+
2030+
expect(typeof change.fullDocument.a).to.equal('number');
2031+
}
2032+
});
2033+
});
2034+
2035+
context('when set to false', () => {
2036+
it('converts Long values to native numbers', {
2037+
metadata: { requires: { topology: '!single' } },
2038+
test: async function () {
2039+
cs = collection.watch([], { promoteLongs: false });
2040+
2041+
const willBeChange = once(cs, 'change').then(args => args[0]);
2042+
await once(cs.cursor, 'init');
2043+
2044+
const result = await collection.insertOne({ a: Long.fromNumber(0) });
2045+
expect(result).to.exist;
2046+
2047+
const change = await willBeChange;
2048+
expect(change).to.have.nested.property('fullDocument.a').that.is.instanceOf(Long);
2049+
}
2050+
});
2051+
});
2052+
2053+
context('when omitted', () => {
2054+
it('defaults to true', {
2055+
metadata: { requires: { topology: '!single' } },
2056+
test: async function () {
2057+
cs = collection.watch([]);
2058+
2059+
const willBeChange = once(cs, 'change').then(args => args[0]);
2060+
await once(cs.cursor, 'init');
2061+
2062+
const result = await collection.insertOne({ a: Long.fromNumber(0) });
2063+
expect(result).to.exist;
2064+
2065+
const change = await willBeChange;
2066+
expect(typeof change.fullDocument.a).to.equal('number');
2067+
}
2068+
});
2069+
});
2070+
});
2071+
2072+
context('invalid options', function () {
2073+
it('does not send invalid options on the aggregate command', {
2074+
metadata: { requires: { topology: '!single' } },
2075+
test: async function () {
2076+
const started = [];
2077+
2078+
client.on('commandStarted', filterForCommands(['aggregate'], started));
2079+
const doc = { invalidBSONOption: true };
2080+
cs = collection.watch([], doc);
2081+
2082+
const willBeChange = once(cs, 'change').then(args => args[0]);
2083+
await once(cs.cursor, 'init');
2084+
2085+
const result = await collection.insertOne({ a: Long.fromNumber(0) });
2086+
expect(result).to.exist;
2087+
2088+
await willBeChange;
2089+
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
2090+
}
2091+
});
2092+
2093+
it('does not send invalid options on the getMore command', {
2094+
metadata: { requires: { topology: '!single' } },
2095+
test: async function () {
2096+
const started = [];
2097+
2098+
client.on('commandStarted', filterForCommands(['aggregate'], started));
2099+
const doc = { invalidBSONOption: true };
2100+
cs = collection.watch([], doc);
2101+
2102+
const willBeChange = once(cs, 'change').then(args => args[0]);
2103+
await once(cs.cursor, 'init');
2104+
2105+
const result = await collection.insertOne({ a: Long.fromNumber(0) });
2106+
expect(result).to.exist;
2107+
2108+
await willBeChange;
2109+
expect(started[0].command).not.to.haveOwnProperty('invalidBSONOption');
2110+
}
2111+
});
2112+
});
2113+
});
19932114
});

0 commit comments

Comments
 (0)