Skip to content

Commit f696909

Browse files
durrandariakp
andauthored
feat(NODE-3083): support aggregate writes on secondaries (#3022)
Co-authored-by: Daria Pardue <[email protected]>
1 parent d67eae0 commit f696909

15 files changed

+528
-15
lines changed

src/cmap/connection.ts

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ export interface CommandOptions extends BSONSerializeOptions {
9797
session?: ClientSession;
9898
documentsReturnedIn?: string;
9999
noResponse?: boolean;
100+
omitReadPreference?: boolean;
100101

101102
// FIXME: NODE-2802
102103
willRetryWrite?: boolean;

src/operations/aggregate.ts

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { CommandOperation, CommandOperationOptions, CollationOptions } from './command';
2-
import { ReadPreference } from '../read_preference';
32
import { MongoInvalidArgumentError } from '../error';
43
import { maxWireVersion, MongoDBNamespace } from '../utils';
54
import { Aspect, defineAspects, Hint } from './operation';
@@ -65,7 +64,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
6564
}
6665

6766
if (this.hasWriteStage) {
68-
this.readPreference = ReadPreference.primary;
67+
this.trySecondaryWrite = true;
6968
}
7069

7170
if (this.explain && this.writeConcern) {

src/operations/command.ts

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import type { Server } from '../sdam/server';
1010
import type { BSONSerializeOptions, Document } from '../bson';
1111
import type { ReadConcernLike } from './../read_concern';
1212
import { Explain, ExplainOptions } from '../explain';
13+
import { MIN_SECONDARY_WRITE_WIRE_VERSION } from '../sdam/server_selection';
1314

1415
const SUPPORTS_WRITE_CONCERN_AND_COLLATION = 5;
1516

@@ -126,6 +127,10 @@ export abstract class CommandOperation<T> extends AbstractOperation<T> {
126127
Object.assign(cmd, { readConcern: this.readConcern });
127128
}
128129

130+
if (this.trySecondaryWrite && serverWireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION) {
131+
options.omitReadPreference = true;
132+
}
133+
129134
if (options.collation && serverWireVersion < SUPPORTS_WRITE_CONCERN_AND_COLLATION) {
130135
callback(
131136
new MongoCompatibilityError(

src/operations/execute_operation.ts

+13-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type { Topology } from '../sdam/topology';
1717
import type { ClientSession } from '../sessions';
1818
import type { Document } from '../bson';
1919
import { supportsRetryableWrites } from '../utils';
20+
import { secondaryWritableServerSelector, ServerSelector } from '../sdam/server_selection';
2021

2122
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
2223
const MMAPv1_RETRY_WRITES_ERROR_MESSAGE =
@@ -150,6 +151,16 @@ function executeWithServerSelection(
150151
session.unpin();
151152
}
152153

154+
let selector: ReadPreference | ServerSelector;
155+
156+
// If operation should try to write to secondary use the custom server selector
157+
// otherwise provide the read preference.
158+
if (operation.trySecondaryWrite) {
159+
selector = secondaryWritableServerSelector(topology.commonWireVersion, readPreference);
160+
} else {
161+
selector = readPreference;
162+
}
163+
153164
const serverSelectionOptions = { session };
154165
function callbackWithRetry(err?: any, result?: any) {
155166
if (err == null) {
@@ -182,7 +193,7 @@ function executeWithServerSelection(
182193
}
183194

184195
// select a new server, and attempt to retry the operation
185-
topology.selectServer(readPreference, serverSelectionOptions, (e?: any, server?: any) => {
196+
topology.selectServer(selector, serverSelectionOptions, (e?: any, server?: any) => {
186197
if (
187198
e ||
188199
(operation.hasAspect(Aspect.READ_OPERATION) && !supportsRetryableReads(server)) ||
@@ -227,7 +238,7 @@ function executeWithServerSelection(
227238
}
228239

229240
// select a server, and execute the operation against it
230-
topology.selectServer(readPreference, serverSelectionOptions, (err?: any, server?: any) => {
241+
topology.selectServer(selector, serverSelectionOptions, (err?: any, server?: any) => {
231242
if (err) {
232243
callback(err);
233244
return;

src/operations/operation.ts

+3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export interface OperationOptions extends BSONSerializeOptions {
3131

3232
/** @internal Hints to `executeOperation` that this operation should not unpin on an ended transaction */
3333
bypassPinningCheck?: boolean;
34+
omitReadPreference?: boolean;
3435
}
3536

3637
/** @internal */
@@ -49,6 +50,7 @@ export abstract class AbstractOperation<TResult = any> {
4950
readPreference: ReadPreference;
5051
server!: Server;
5152
bypassPinningCheck: boolean;
53+
trySecondaryWrite: boolean;
5254

5355
// BSON serialization options
5456
bsonOptions?: BSONSerializeOptions;
@@ -72,6 +74,7 @@ export abstract class AbstractOperation<TResult = any> {
7274

7375
this.options = options;
7476
this.bypassPinningCheck = !!options.bypassPinningCheck;
77+
this.trySecondaryWrite = false;
7578
}
7679

7780
abstract execute(server: Server, session: ClientSession, callback: Callback<TResult>): void;

src/sdam/server.ts

+8
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
299299
// Clone the options
300300
const finalOptions = Object.assign({}, options, { wireProtocolCommand: false });
301301

302+
// There are cases where we need to flag the read preference not to get sent in
303+
// the command, such as pre-5.0 servers attempting to perform an aggregate write
304+
// with a non-primary read preference. In this case the effective read preference
305+
// (primary) is not the same as the provided and must be removed completely.
306+
if (finalOptions.omitReadPreference) {
307+
delete finalOptions.readPreference;
308+
}
309+
302310
// error if collation not supported
303311
if (collationNotSupported(this, cmd)) {
304312
callback(new MongoCompatibilityError(`Server ${this.name} does not support collation`));

src/sdam/server_selection.ts

+25
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import type { ServerDescription, TagSet } from './server_description';
88
const IDLE_WRITE_PERIOD = 10000;
99
const SMALLEST_MAX_STALENESS_SECONDS = 90;
1010

11+
// Minimum version to try writes on secondaries.
12+
export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
13+
1114
/** @public */
1215
export type ServerSelector = (
1316
topologyDescription: TopologyDescription,
@@ -28,6 +31,28 @@ export function writableServerSelector(): ServerSelector {
2831
);
2932
}
3033

34+
/**
35+
* Returns a server selector that uses a read preference to select a
36+
* server potentially for a write on a secondary.
37+
*/
38+
export function secondaryWritableServerSelector(
39+
wireVersion?: number,
40+
readPreference?: ReadPreference
41+
): ServerSelector {
42+
// If server version < 5.0, read preference always primary.
43+
// If server version >= 5.0...
44+
// - If read preference is supplied, use that.
45+
// - If no read preference is supplied, use primary.
46+
if (
47+
!readPreference ||
48+
!wireVersion ||
49+
(wireVersion && wireVersion < MIN_SECONDARY_WRITE_WIRE_VERSION)
50+
) {
51+
return readPreferenceServerSelector(ReadPreference.primary);
52+
}
53+
return readPreferenceServerSelector(readPreference);
54+
}
55+
3156
/**
3257
* Reduces the passed in array of servers by the rules of the "Max Staleness" specification
3358
* found here: https://github.com/mongodb/specifications/blob/master/source/max-staleness/max-staleness.rst

src/sdam/topology.ts

+4
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
797797
return result;
798798
}
799799

800+
get commonWireVersion(): number | undefined {
801+
return this.description.commonWireVersion;
802+
}
803+
800804
get logicalSessionTimeoutMinutes(): number | undefined {
801805
return this.description.logicalSessionTimeoutMinutes;
802806
}

test/functional/crud_spec.test.js

+1-5
Original file line numberDiff line numberDiff line change
@@ -424,15 +424,11 @@ describe('CRUD spec v1', function () {
424424
}
425425
});
426426

427-
// TODO: Unskip when implementing NODE-3083.
428-
const SKIP = ['aggregate-write-readPreference', 'db-aggregate-write-readPreference'];
429-
430427
describe('CRUD unified', function () {
431428
for (const crudSpecTest of loadSpecTests('crud/unified')) {
432429
expect(crudSpecTest).to.exist;
433430
const testDescription = String(crudSpecTest.description);
434-
const spec = SKIP.includes(testDescription) ? context.skip : context;
435-
spec(testDescription, function () {
431+
context(testDescription, function () {
436432
for (const test of crudSpecTest.tests) {
437433
it(String(test.description), {
438434
metadata: { sessions: { skipLeakTests: true } },

test/spec/crud/unified/aggregate-write-readPreference.json

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"description": "aggregate-write-readPreference",
3-
"schemaVersion": "1.3",
3+
"schemaVersion": "1.4",
44
"runOnRequirements": [
55
{
66
"minServerVersion": "3.6",
@@ -90,7 +90,8 @@
9090
"description": "Aggregate with $out includes read preference for 5.0+ server",
9191
"runOnRequirements": [
9292
{
93-
"minServerVersion": "5.0"
93+
"minServerVersion": "5.0",
94+
"serverless": "forbid"
9495
}
9596
],
9697
"operations": [
@@ -181,7 +182,8 @@
181182
"runOnRequirements": [
182183
{
183184
"minServerVersion": "4.2",
184-
"maxServerVersion": "4.4.99"
185+
"maxServerVersion": "4.4.99",
186+
"serverless": "forbid"
185187
}
186188
],
187189
"operations": [

test/spec/crud/unified/aggregate-write-readPreference.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
description: aggregate-write-readPreference
22

3-
schemaVersion: '1.3'
3+
schemaVersion: '1.4'
44

55
runOnRequirements:
66
# 3.6+ non-standalone is needed to utilize $readPreference in OP_MSG
@@ -59,6 +59,7 @@ tests:
5959
- description: "Aggregate with $out includes read preference for 5.0+ server"
6060
runOnRequirements:
6161
- minServerVersion: "5.0"
62+
serverless: "forbid"
6263
operations:
6364
- object: *collection0
6465
name: aggregate
@@ -91,6 +92,7 @@ tests:
9192
# drivers may avoid inheriting a client-level read concern for pre-4.2.
9293
- minServerVersion: "4.2"
9394
maxServerVersion: "4.4.99"
95+
serverless: "forbid"
9496
operations:
9597
- object: *collection0
9698
name: aggregate

test/spec/crud/unified/db-aggregate-write-readPreference.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@
6464
"description": "Database-level aggregate with $out includes read preference for 5.0+ server",
6565
"runOnRequirements": [
6666
{
67-
"minServerVersion": "5.0"
67+
"minServerVersion": "5.0",
68+
"serverless": "forbid"
6869
}
6970
],
7071
"operations": [
@@ -158,7 +159,8 @@
158159
"runOnRequirements": [
159160
{
160161
"minServerVersion": "4.2",
161-
"maxServerVersion": "4.4.99"
162+
"maxServerVersion": "4.4.99",
163+
"serverless": "forbid"
162164
}
163165
],
164166
"operations": [

test/spec/crud/unified/db-aggregate-write-readPreference.yml

+2
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ tests:
5252
- description: "Database-level aggregate with $out includes read preference for 5.0+ server"
5353
runOnRequirements:
5454
- minServerVersion: "5.0"
55+
serverless: "forbid"
5556
operations:
5657
- object: *database0
5758
name: aggregate
@@ -85,6 +86,7 @@ tests:
8586
# drivers may avoid inheriting a client-level read concern for pre-4.2.
8687
- minServerVersion: "4.2"
8788
maxServerVersion: "4.4.99"
89+
serverless: "forbid"
8890
operations:
8991
- object: *database0
9092
name: aggregate
+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
'use strict';
2+
3+
const { expect } = require('chai');
4+
const { AggregateOperation } = require('../../../src/operations/aggregate');
5+
6+
describe('AggregateOperation', function () {
7+
const db = 'test';
8+
9+
describe('#constructor', function () {
10+
context('when out is in the options', function () {
11+
const operation = new AggregateOperation(db, [], { out: 'test', dbName: db });
12+
13+
it('sets trySecondaryWrite to true', function () {
14+
expect(operation.trySecondaryWrite).to.be.true;
15+
});
16+
});
17+
18+
context('when $out is the last stage', function () {
19+
const operation = new AggregateOperation(db, [{ $out: 'test' }], { dbName: db });
20+
21+
it('sets trySecondaryWrite to true', function () {
22+
expect(operation.trySecondaryWrite).to.be.true;
23+
});
24+
});
25+
26+
context('when $out is not the last stage', function () {
27+
const operation = new AggregateOperation(db, [{ $out: 'test' }, { $project: { name: 1 } }], {
28+
dbName: db
29+
});
30+
31+
it('sets trySecondaryWrite to false', function () {
32+
expect(operation.trySecondaryWrite).to.be.false;
33+
});
34+
});
35+
36+
context('when $merge is the last stage', function () {
37+
const operation = new AggregateOperation(db, [{ $merge: { into: 'test' } }], { dbName: db });
38+
39+
it('sets trySecondaryWrite to true', function () {
40+
expect(operation.trySecondaryWrite).to.be.true;
41+
});
42+
});
43+
44+
context('when $merge is not the last stage', function () {
45+
const operation = new AggregateOperation(
46+
db,
47+
[{ $merge: { into: 'test' } }, { $project: { name: 1 } }],
48+
{ dbName: db }
49+
);
50+
51+
it('sets trySecondaryWrite to false', function () {
52+
expect(operation.trySecondaryWrite).to.be.false;
53+
});
54+
});
55+
56+
context('when no writable stages in empty pipeline', function () {
57+
const operation = new AggregateOperation(db, [], { dbName: db });
58+
59+
it('sets trySecondaryWrite to false', function () {
60+
expect(operation.trySecondaryWrite).to.be.false;
61+
});
62+
});
63+
64+
context('when no writable stages', function () {
65+
const operation = new AggregateOperation(db, [{ $project: { name: 1 } }], { dbName: db });
66+
67+
it('sets trySecondaryWrite to false', function () {
68+
expect(operation.trySecondaryWrite).to.be.false;
69+
});
70+
});
71+
});
72+
});

0 commit comments

Comments
 (0)