Skip to content

Commit 650eb54

Browse files
committed
test(ChangeStream): update changeStream spec tests / test runner
- update changeStream spec tests - update spec-test runner to comply with tests - Adds a default value for expectations array in case there are no expectations - Parses expected values through Extended JSON to ensure that they are in canonical format - fix result assertion in change stream spec test - get rid of EJSONToJSON - add fail point to spec tests. - Skips a test that will not pass until later features are added - fix flakey test in Node 10.16 - fix race condition in test runner Fixes NODE-2017
1 parent a487be4 commit 650eb54

7 files changed

+395
-43
lines changed

test/functional/change_stream_spec_tests.js

+34-10
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ const delay = require('./shared').delay;
99
const expect = chai.expect;
1010

1111
describe('Change Stream Spec', function() {
12-
const EJSONToJSON = x => JSON.parse(EJSON.stringify(x));
13-
1412
let globalClient;
1513
let ctx;
1614
let events;
1715

16+
const TESTS_TO_SKIP = new Set(['Test consecutive resume']);
17+
1818
before(function() {
1919
const configuration = this.configuration;
2020
return setupDatabase(configuration).then(() => {
@@ -34,7 +34,7 @@ describe('Change Stream Spec', function() {
3434
.filter(filename => filename.match(/\.json$/))
3535
.forEach(filename => {
3636
const specString = fs.readFileSync(`${__dirname}/spec/change-stream/${filename}`, 'utf8');
37-
const specData = JSON.parse(specString);
37+
const specData = EJSON.parse(specString, { relaxed: true });
3838

3939
const ALL_DBS = [specData.database_name, specData.database2_name];
4040

@@ -64,11 +64,14 @@ describe('Change Stream Spec', function() {
6464
ctx = undefined;
6565
events = undefined;
6666

67+
client.removeAllListeners('commandStarted');
68+
6769
return client && client.close();
6870
});
6971

7072
specData.tests.forEach(test => {
71-
const itFn = test.skip ? it.skip : test.only ? it.only : it;
73+
const shouldSkip = test.skip || TESTS_TO_SKIP.has(test.description);
74+
const itFn = shouldSkip ? it.skip : test.only ? it.only : it;
7275
const metadata = generateMetadata(test);
7376
const testFn = generateTestFn(test);
7477

@@ -94,18 +97,30 @@ describe('Change Stream Spec', function() {
9497
}
9598

9699
function generateTestFn(test) {
100+
const configureFailPoint = makeFailPointCommand(test);
97101
const testFnRunOperations = makeTestFnRunOperations(test);
98102
const testSuccess = makeTestSuccess(test);
99103
const testFailure = makeTestFailure(test);
100104
const testAPM = makeTestAPM(test);
101105

102106
return function testFn() {
103-
return testFnRunOperations(ctx)
107+
return configureFailPoint(ctx)
108+
.then(() => testFnRunOperations(ctx))
104109
.then(testSuccess, testFailure)
105110
.then(() => testAPM(ctx, events));
106111
};
107112
}
108113

114+
function makeFailPointCommand(test) {
115+
if (!test.failPoint) {
116+
return () => Promise.resolve();
117+
}
118+
119+
return function(ctx) {
120+
return ctx.gc.db('admin').command(test.failPoint);
121+
};
122+
}
123+
109124
function makeTestSuccess(test) {
110125
const result = test.result;
111126

@@ -115,7 +130,7 @@ describe('Change Stream Spec', function() {
115130
}
116131

117132
if (result.success) {
118-
value = EJSONToJSON(value);
133+
expect(value).to.have.a.lengthOf(result.success.length);
119134
assertEquality(value, result.success);
120135
}
121136
};
@@ -134,7 +149,7 @@ describe('Change Stream Spec', function() {
134149
}
135150

136151
function makeTestAPM(test) {
137-
const expectedEvents = test.expectations;
152+
const expectedEvents = test.expectations || [];
138153

139154
return function testAPM(ctx, events) {
140155
expectedEvents
@@ -146,12 +161,21 @@ describe('Change Stream Spec', function() {
146161
`Expected there to be an APM event at index ${idx}, but there was none`
147162
);
148163
}
149-
const actual = EJSONToJSON(events[idx]);
150-
assertEquality(actual, expected);
164+
assertEquality(events[idx], expected);
151165
});
152166
};
153167
}
154168

169+
function allSettled(promises) {
170+
let err;
171+
return Promise.all(promises.map(p => p.catch(x => (err = err || x)))).then(args => {
172+
if (err) {
173+
throw err;
174+
}
175+
return args;
176+
});
177+
}
178+
155179
function makeTestFnRunOperations(test) {
156180
const target = test.target;
157181
const operations = test.operations;
@@ -165,7 +189,7 @@ describe('Change Stream Spec', function() {
165189
const changeStreamPromise = readAndCloseChangeStream(ctx.changeStream, success.length);
166190
const operationsPromise = runOperations(ctx.gc, operations);
167191

168-
return Promise.all([changeStreamPromise, operationsPromise]).then(args => args[0]);
192+
return allSettled([changeStreamPromise, operationsPromise]).then(args => args[0]);
169193
};
170194
}
171195

test/functional/change_stream_tests.js

+18-16
Original file line numberDiff line numberDiff line change
@@ -1272,7 +1272,7 @@ describe('Change Streams', function() {
12721272
// The actual test we wish to run
12731273
test: function(done) {
12741274
var configuration = this.configuration;
1275-
var fs = require('fs');
1275+
const stream = require('stream');
12761276
const client = configuration.newClient();
12771277

12781278
client.connect(function(err, client) {
@@ -1282,30 +1282,32 @@ describe('Change Streams', function() {
12821282
var theCollection = theDatabase.collection('pipeTest');
12831283
var thisChangeStream = theCollection.watch(pipeline);
12841284

1285-
var filename = '/tmp/_nodemongodbnative_stream_out.txt';
1286-
var outStream = fs.createWriteStream(filename);
1285+
var outStream = new stream.PassThrough({ objectMode: true });
12871286

12881287
// Make a stream transforming to JSON and piping to the file
12891288
thisChangeStream.stream({ transform: JSON.stringify }).pipe(outStream);
12901289

1290+
function close(_err) {
1291+
thisChangeStream.close(err => client.close(cErr => done(_err || err || cErr)));
1292+
}
1293+
1294+
outStream
1295+
.on('data', data => {
1296+
try {
1297+
var parsedEvent = JSON.parse(data);
1298+
assert.equal(parsedEvent.fullDocument.a, 1);
1299+
close();
1300+
} catch (e) {
1301+
close(e);
1302+
}
1303+
})
1304+
.on('error', close);
1305+
12911306
setTimeout(() => {
12921307
theCollection.insert({ a: 1 }, function(err) {
12931308
assert.ifError(err);
12941309
});
12951310
});
1296-
1297-
// Listen for changes to the file
1298-
var watcher = fs.watch(filename, function(eventType) {
1299-
assert.equal(eventType, 'change');
1300-
1301-
var fileContents = fs.readFileSync(filename, 'utf8');
1302-
var parsedFileContents = JSON.parse(fileContents);
1303-
assert.equal(parsedFileContents.fullDocument.a, 1);
1304-
1305-
watcher.close();
1306-
1307-
thisChangeStream.close(err => client.close(cErr => done(err || cErr)));
1308-
});
13091311
});
13101312
}
13111313
});

test/functional/spec/change-stream/README.rst

+62-14
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Each YAML file has the following keys:
3333
- ``description``: The name of the test.
3434
- ``minServerVersion``: The minimum server version to run this test against. If not present, assume there is no minimum server version.
3535
- ``maxServerVersion``: Reserved for later use
36-
- ``failPoint``: Reserved for later use
36+
- ``failPoint``(optional): The configureFailPoint command document to run to configure a fail point on the primary server.
3737
- ``target``: The entity on which to run the change stream. Valid values are:
3838

3939
- ``collection``: Watch changes on collection ``database_name.collection_name``
@@ -44,10 +44,11 @@ Each YAML file has the following keys:
4444
- ``changeStreamPipeline``: An array of additional aggregation pipeline stages to add to the change stream
4545
- ``changeStreamOptions``: Additional options to add to the changeStream
4646
- ``operations``: Array of documents, each describing an operation. Each document has the following fields:
47+
4748
- ``database``: Database against which to run the operation
4849
- ``collection``: Collection against which to run the operation
49-
- ``commandName``: Name of the command to run
50-
- ``arguments``: Object of arguments for the command (ex: document to insert)
50+
- ``name``: Name of the command to run
51+
- ``arguments`` (optional): Object of arguments for the command (ex: document to insert)
5152

5253
- ``expectations``: Optional list of command-started events in Extended JSON format
5354
- ``result``: Document with ONE of the following fields:
@@ -103,6 +104,9 @@ For each YAML file, for each element in ``tests``:
103104
- Drop the database ``database2_name``
104105
- Create the database ``database_name`` and the collection ``database_name.collection_name``
105106
- Create the database ``database2_name`` and the collection ``database2_name.collection2_name``
107+
- If the the ``failPoint`` field is present, configure the fail point on the primary server. See
108+
`Server Fail Point <../../transactions/tests#server-fail-point>`_ in the
109+
Transactions spec test documentation for more information.
106110

107111
- Create a new MongoClient ``client``
108112
- Begin monitoring all APM events for ``client``. (If the driver uses global listeners, filter out all events that do not originate with ``client``). Filter out any "internal" commands (e.g. ``isMaster``)
@@ -117,12 +121,12 @@ For each YAML file, for each element in ``tests``:
117121
- If there was an error:
118122

119123
- Assert that an error was expected for the test.
120-
- Assert that the error MATCHES ``results.error``
124+
- Assert that the error MATCHES ``result.error``
121125

122126
- Else:
123127

124128
- Assert that no error was expected for the test
125-
- Assert that the changes received from ``changeStream`` MATCH the results in ``results.success``
129+
- Assert that the changes received from ``changeStream`` MATCH the results in ``result.success``
126130

127131
- If there are any ``expectations``
128132

@@ -144,12 +148,56 @@ Prose Tests
144148

145149
The following tests have not yet been automated, but MUST still be tested
146150

147-
1. ``ChangeStream`` must continuously track the last seen ``resumeToken``
148-
2. ``ChangeStream`` will throw an exception if the server response is missing the resume token
149-
3. ``ChangeStream`` will automatically resume one time on a resumable error (including `not master`) with the initial pipeline and options, except for the addition/update of a ``resumeToken``.
150-
4. ``ChangeStream`` will not attempt to resume on a server error
151-
5. ``ChangeStream`` will perform server selection before attempting to resume, using initial ``readPreference``
152-
6. Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed on the driver side.
153-
7. The ``killCursors`` command sent during the "Resume Process" must not be allowed to throw an exception.
154-
8. ``$changeStream`` stage for ``ChangeStream`` against a server ``>=4.0`` that has not received any results yet MUST include a ``startAtOperationTime`` option when resuming a changestream.
155-
9. ``ChangeStream`` will resume after a ``killCursors`` command is issued for its child cursor.
151+
#. ``ChangeStream`` must continuously track the last seen ``resumeToken``
152+
#. ``ChangeStream`` will throw an exception if the server response is missing the resume token (if wire version is < 8, this is a driver-side error; for 8+, this is a server-side error)
153+
#. ``ChangeStream`` will automatically resume one time on a resumable error (including `not master`) with the initial pipeline and options, except for the addition/update of a ``resumeToken``.
154+
#. ``ChangeStream`` will not attempt to resume on any error encountered while executing an ``aggregate`` command.
155+
#. ``ChangeStream`` will not attempt to resume after encountering error code 11601 (Interrupted), 136 (CappedPositionLost), or 237 (CursorKilled) while executing a ``getMore`` command.
156+
#. ``ChangeStream`` will perform server selection before attempting to resume, using initial ``readPreference``
157+
#. Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed on the driver side.
158+
#. The ``killCursors`` command sent during the "Resume Process" must not be allowed to throw an exception.
159+
#. ``$changeStream`` stage for ``ChangeStream`` against a server ``>=4.0`` and ``<4.0.7`` that has not received any results yet MUST include a ``startAtOperationTime`` option when resuming a changestream.
160+
#. ``ChangeStream`` will resume after a ``killCursors`` command is issued for its child cursor.
161+
#. - For a ``ChangeStream`` under these conditions:
162+
- Running against a server ``>=4.0.7``.
163+
- The batch is empty or has been iterated to the last document.
164+
- Expected result:
165+
- ``getResumeToken`` must return the ``postBatchResumeToken`` from the current command response.
166+
#. - For a ``ChangeStream`` under these conditions:
167+
- Running against a server ``<4.0.7``.
168+
- The batch is empty or has been iterated to the last document.
169+
- Expected result:
170+
- ``getResumeToken`` must return the ``_id`` of the last document returned if one exists.
171+
- ``getResumeToken`` must return ``startAfter`` from the initial aggregate if the option was specified.
172+
- ``getResumeToken`` must return ``resumeAfter`` from the initial aggregate if the option was specified.
173+
- If neither the ``startAfter`` nor ``resumeAfter`` options were specified, the ``getResumeToken`` result must be empty.
174+
#. - For a ``ChangeStream`` under these conditions:
175+
- The batch is not empty.
176+
- The batch has been iterated up to but not including the last element.
177+
- Expected result:
178+
- ``getResumeToken`` must return the ``_id`` of the previous document returned.
179+
#. - For a ``ChangeStream`` under these conditions:
180+
- The batch is not empty.
181+
- The batch hasn’t been iterated at all.
182+
- Only the initial ``aggregate`` command has been executed.
183+
- Expected result:
184+
- ``getResumeToken`` must return ``startAfter`` from the initial aggregate if the option was specified.
185+
- ``getResumeToken`` must return ``resumeAfter`` from the initial aggregate if the option was specified.
186+
- If neither the ``startAfter`` nor ``resumeAfter`` options were specified, the ``getResumeToken`` result must be empty.
187+
#. - For a ``ChangeStream`` under these conditions:
188+
- Running against a server ``>=4.0.7``.
189+
- The batch is not empty.
190+
- The batch hasn’t been iterated at all.
191+
- The stream has iterated beyond a previous batch and a ``getMore`` command has just been executed.
192+
- Expected result:
193+
- ``getResumeToken`` must return the ``postBatchResumeToken`` from the previous command response.
194+
#. - For a ``ChangeStream`` under these conditions:
195+
- Running against a server ``<4.0.7``.
196+
- The batch is not empty.
197+
- The batch hasn’t been iterated at all.
198+
- The stream has iterated beyond a previous batch and a ``getMore`` command has just been executed.
199+
- Expected result:
200+
- ``getResumeToken`` must return the ``_id`` of the previous document returned if one exists.
201+
- ``getResumeToken`` must return ``startAfter`` from the initial aggregate if the option was specified.
202+
- ``getResumeToken`` must return ``resumeAfter`` from the initial aggregate if the option was specified.
203+
- If neither the ``startAfter`` nor ``resumeAfter`` options were specified, the ``getResumeToken`` result must be empty.

test/functional/spec/change-stream/change-streams-errors.json

+37
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,43 @@
7373
"code": 40324
7474
}
7575
}
76+
},
77+
{
78+
"description": "Change Stream should error when _id is projected out",
79+
"minServerVersion": "4.1.11",
80+
"target": "collection",
81+
"topology": [
82+
"replicaset",
83+
"sharded"
84+
],
85+
"changeStreamPipeline": [
86+
{
87+
"$project": {
88+
"_id": 0
89+
}
90+
}
91+
],
92+
"changeStreamOptions": {},
93+
"operations": [
94+
{
95+
"database": "change-stream-tests",
96+
"collection": "test",
97+
"name": "insertOne",
98+
"arguments": {
99+
"document": {
100+
"z": 3
101+
}
102+
}
103+
}
104+
],
105+
"result": {
106+
"error": {
107+
"code": 280,
108+
"errorLabels": [
109+
"NonResumableChangeStreamError"
110+
]
111+
}
112+
}
76113
}
77114
]
78115
}

test/functional/spec/change-stream/change-streams-errors.yml

+24-1
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,27 @@ tests:
5050
database_name: *database_name
5151
result:
5252
error:
53-
code: 40324
53+
code: 40324
54+
-
55+
description: Change Stream should error when _id is projected out
56+
minServerVersion: "4.1.11"
57+
target: collection
58+
topology:
59+
- replicaset
60+
- sharded
61+
changeStreamPipeline:
62+
-
63+
$project: { _id: 0 }
64+
changeStreamOptions: {}
65+
operations:
66+
-
67+
database: *database_name
68+
collection: *collection_name
69+
name: insertOne
70+
arguments:
71+
document:
72+
z: 3
73+
result:
74+
error:
75+
code: 280
76+
errorLabels: [ "NonResumableChangeStreamError" ]

0 commit comments

Comments
 (0)