Skip to content

Commit f7c3b34

Browse files
committed
feat(NODE-3470): retry selects another mongos
1 parent 86e2659 commit f7c3b34

6 files changed

+455
-23
lines changed

Diff for: src/operations/execute_operation.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
import type { MongoClient } from '../mongo_client';
1919
import { ReadPreference } from '../read_preference';
2020
import type { Server } from '../sdam/server';
21+
import type { ServerDescription } from '../sdam/server_description';
2122
import {
2223
sameServerSelector,
2324
secondaryWritableServerSelector,
@@ -183,7 +184,8 @@ export async function executeOperation<
183184
return await retryOperation(operation, operationError, {
184185
session,
185186
topology,
186-
selector
187+
selector,
188+
previousServer: server.description
187189
});
188190
}
189191
throw operationError;
@@ -199,6 +201,7 @@ type RetryOptions = {
199201
session: ClientSession;
200202
topology: Topology;
201203
selector: ReadPreference | ServerSelector;
204+
previousServer: ServerDescription;
202205
};
203206

204207
async function retryOperation<
@@ -207,7 +210,7 @@ async function retryOperation<
207210
>(
208211
operation: T,
209212
originalError: MongoError,
210-
{ session, topology, selector }: RetryOptions
213+
{ session, topology, selector, previousServer }: RetryOptions
211214
): Promise<TResult> {
212215
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
213216
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
@@ -243,7 +246,8 @@ async function retryOperation<
243246
// select a new server, and attempt to retry the operation
244247
const server = await topology.selectServerAsync(selector, {
245248
session,
246-
operationName: operation.commandName
249+
operationName: operation.commandName,
250+
previousServer: previousServer
247251
});
248252

249253
if (isWriteOperation && !supportsRetryableWrites(server)) {

Diff for: src/sdam/server_selection.ts

+10-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
1414
/** @internal */
1515
export type ServerSelector = (
1616
topologyDescription: TopologyDescription,
17-
servers: ServerDescription[]
17+
servers: ServerDescription[],
18+
deprioritized?: ServerDescription[]
1819
) => ServerDescription[];
1920

2021
/**
@@ -266,7 +267,8 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
266267

267268
return (
268269
topologyDescription: TopologyDescription,
269-
servers: ServerDescription[]
270+
servers: ServerDescription[],
271+
deprioritized: ServerDescription[] = []
270272
): ServerDescription[] => {
271273
const commonWireVersion = topologyDescription.commonWireVersion;
272274
if (
@@ -287,13 +289,15 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
287289
return [];
288290
}
289291

290-
if (
291-
topologyDescription.type === TopologyType.Single ||
292-
topologyDescription.type === TopologyType.Sharded
293-
) {
292+
if (topologyDescription.type === TopologyType.Single) {
294293
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
295294
}
296295

296+
if (topologyDescription.type === TopologyType.Sharded) {
297+
const selectable = servers.length > 0 ? servers : deprioritized;
298+
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
299+
}
300+
297301
const mode = readPreference.mode;
298302
if (mode === ReadPreference.PRIMARY) {
299303
return servers.filter(primaryFilter);

Diff for: src/sdam/topology.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ export interface ServerSelectionRequest {
110110
timeoutController: TimeoutController;
111111
operationName: string;
112112
waitingLogged: boolean;
113+
previousServer?: ServerDescription;
113114
}
114115

115116
/** @internal */
@@ -175,6 +176,7 @@ export interface SelectServerOptions {
175176
serverSelectionTimeoutMS?: number;
176177
session?: ClientSession;
177178
operationName: string;
179+
previousServer?: ServerDescription;
178180
}
179181

180182
/** @public */
@@ -598,7 +600,8 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
598600
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
599601
startTime: now(),
600602
operationName: options.operationName,
601-
waitingLogged: false
603+
waitingLogged: false,
604+
previousServer: options.previousServer
602605
};
603606

604607
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
@@ -930,8 +933,13 @@ function processWaitQueue(topology: Topology) {
930933
let selectedDescriptions;
931934
try {
932935
const serverSelector = waitQueueMember.serverSelector;
936+
const previousServer = waitQueueMember.previousServer;
933937
selectedDescriptions = serverSelector
934-
? serverSelector(topology.description, serverDescriptions)
938+
? serverSelector(
939+
topology.description,
940+
serverDescriptions,
941+
previousServer ? [previousServer] : []
942+
)
935943
: serverDescriptions;
936944
} catch (e) {
937945
waitQueueMember.timeoutController.clear();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import { expect } from 'chai';
2+
3+
import type { CommandFailedEvent, CommandSucceededEvent } from '../../mongodb';
4+
5+
const TEST_METADATA = { requires: { mongodb: '>=4.2.9', topology: 'sharded' } };
6+
const FAIL_COMMAND = {
7+
configureFailPoint: 'failCommand',
8+
mode: { times: 1 },
9+
data: {
10+
failCommands: ['find'],
11+
errorCode: 6,
12+
closeConnection: true
13+
}
14+
};
15+
const DISABLE_FAIL_COMMAND = {
16+
configureFailPoint: 'failCommand',
17+
mode: 'off',
18+
data: {
19+
failCommands: ['find'],
20+
errorCode: 6,
21+
closeConnection: true
22+
}
23+
};
24+
25+
describe('Server Selection Sharded Retryable Reads Prose tests', function () {
26+
context('Retryable Reads Are Retried on a Different mongos if One is Available', function () {
27+
const commandFailedEvents: CommandFailedEvent[] = [];
28+
let client;
29+
let utilClientOne;
30+
let utilClientTwo;
31+
32+
// This test MUST be executed against a sharded cluster that has at least two
33+
// mongos instances.
34+
// 1. Ensure that a test is run against a sharded cluster that has at least two
35+
// mongoses. If there are more than two mongoses in the cluster, pick two to
36+
// test against.
37+
beforeEach(async function () {
38+
const uri = this.configuration.url({
39+
monitorCommands: true,
40+
useMultipleMongoses: true
41+
});
42+
43+
// 3. Create a client with ``retryReads=true`` that connects to the cluster,
44+
// providing the two selected mongoses as seeds.
45+
client = this.configuration.newClient(uri, {
46+
monitorCommands: true,
47+
retryReads: true
48+
});
49+
client.on('commandFailed', event => {
50+
commandFailedEvents.push(event);
51+
});
52+
await client.connect();
53+
const seeds = client.topology.s.seedlist.map(address => address.toString());
54+
55+
// 2. Create a client per mongos using the direct connection, and configure the
56+
// following fail points on each mongos::
57+
// {
58+
// configureFailPoint: "failCommand",
59+
// mode: { times: 1 },
60+
// data: {
61+
// failCommands: ["find"],
62+
// errorCode: 6,
63+
// closeConnection: true
64+
// }
65+
// }
66+
utilClientOne = this.configuration.newClient(`mongodb://${seeds[0]}`, {
67+
directConnection: true
68+
});
69+
utilClientTwo = this.configuration.newClient(`mongodb://${seeds[1]}`, {
70+
directConnection: true
71+
});
72+
await utilClientOne.db('admin').command(FAIL_COMMAND);
73+
await utilClientTwo.db('admin').command(FAIL_COMMAND);
74+
});
75+
76+
afterEach(async function () {
77+
await client?.close();
78+
await utilClientOne.db('admin').command(DISABLE_FAIL_COMMAND);
79+
await utilClientTwo.db('admin').command(DISABLE_FAIL_COMMAND);
80+
await utilClientOne?.close();
81+
await utilClientTwo?.close();
82+
});
83+
84+
// 4. Enable command monitoring, and execute a ``find`` command that is
85+
// supposed to fail on both mongoses.
86+
// 5. Asserts that there were failed command events from each mongos.
87+
// 6. Disable the fail points.
88+
it('retries on a different mongos', TEST_METADATA, async function () {
89+
await client
90+
.db('test')
91+
.collection('test')
92+
.find()
93+
.toArray()
94+
.catch(() => null);
95+
expect(commandFailedEvents[0].address).to.not.equal(commandFailedEvents[1].address);
96+
});
97+
});
98+
99+
// 1. Ensure that a test is run against a sharded cluster. If there are multiple
100+
// mongoses in the cluster, pick one to test against.
101+
context('Retryable Reads Are Retried on the Same mongos if No Others are Available', function () {
102+
const commandFailedEvents: CommandFailedEvent[] = [];
103+
const commandSucceededEvents: CommandSucceededEvent[] = [];
104+
let client;
105+
let utilClient;
106+
107+
beforeEach(async function () {
108+
const uri = this.configuration.url({
109+
monitorCommands: true
110+
});
111+
112+
// 3. Create a client with ``retryReads=true`` that connects to the cluster,
113+
// providing the selected mongos as the seed.
114+
client = this.configuration.newClient(uri, {
115+
monitorCommands: true,
116+
retryReads: true
117+
});
118+
client.on('commandFailed', event => {
119+
commandFailedEvents.push(event);
120+
});
121+
client.on('commandSucceeded', event => {
122+
commandSucceededEvents.push(event);
123+
});
124+
125+
// 2. Create a client that connects to the mongos using the direct connection,
126+
// and configure the following fail point on the mongos::
127+
// {
128+
// configureFailPoint: "failCommand",
129+
// mode: { times: 1 },
130+
// data: {
131+
// failCommands: ["find"],
132+
// errorCode: 6,
133+
// closeConnection: true
134+
// }
135+
// }
136+
utilClient = this.configuration.newClient(uri, {
137+
directConnection: true
138+
});
139+
await utilClient.db('admin').command(FAIL_COMMAND);
140+
});
141+
142+
afterEach(async function () {
143+
await client?.close();
144+
await utilClient?.db('admin').command(DISABLE_FAIL_COMMAND);
145+
await utilClient?.close();
146+
});
147+
148+
// 4. Enable command monitoring, and execute a ``find`` command.
149+
// 5. Asserts that there was a failed command and a successful command event.
150+
// 6. Disable the fail point.
151+
it('retries on the same mongos', TEST_METADATA, async function () {
152+
await client
153+
.db('test')
154+
.collection('test')
155+
.find()
156+
.toArray()
157+
.catch(() => null);
158+
expect(commandFailedEvents[0].address).to.equal(commandSucceededEvents[0].address);
159+
});
160+
});
161+
});

0 commit comments

Comments
 (0)