Skip to content

Commit 348cc5c

Browse files
committed
feat(NODE-3470): retry selects another mongos
1 parent 8504d91 commit 348cc5c

6 files changed

+447
-23
lines changed

Diff for: src/operations/execute_operation.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
import type { MongoClient } from '../mongo_client';
1919
import { ReadPreference } from '../read_preference';
2020
import type { Server } from '../sdam/server';
21+
import type { ServerDescription } from '../sdam/server_description';
2122
import {
2223
sameServerSelector,
2324
secondaryWritableServerSelector,
@@ -183,7 +184,8 @@ export async function executeOperation<
183184
return await retryOperation(operation, operationError, {
184185
session,
185186
topology,
186-
selector
187+
selector,
188+
previousServer: server.description
187189
});
188190
}
189191
throw operationError;
@@ -199,6 +201,7 @@ type RetryOptions = {
199201
session: ClientSession;
200202
topology: Topology;
201203
selector: ReadPreference | ServerSelector;
204+
previousServer: ServerDescription;
202205
};
203206

204207
async function retryOperation<
@@ -207,7 +210,7 @@ async function retryOperation<
207210
>(
208211
operation: T,
209212
originalError: MongoError,
210-
{ session, topology, selector }: RetryOptions
213+
{ session, topology, selector, previousServer }: RetryOptions
211214
): Promise<TResult> {
212215
const isWriteOperation = operation.hasAspect(Aspect.WRITE_OPERATION);
213216
const isReadOperation = operation.hasAspect(Aspect.READ_OPERATION);
@@ -243,7 +246,8 @@ async function retryOperation<
243246
// select a new server, and attempt to retry the operation
244247
const server = await topology.selectServerAsync(selector, {
245248
session,
246-
operationName: operation.commandName
249+
operationName: operation.commandName,
250+
previousServer: previousServer
247251
});
248252

249253
if (isWriteOperation && !supportsRetryableWrites(server)) {

Diff for: src/sdam/server_selection.ts

+10-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ export const MIN_SECONDARY_WRITE_WIRE_VERSION = 13;
1414
/** @internal */
1515
export type ServerSelector = (
1616
topologyDescription: TopologyDescription,
17-
servers: ServerDescription[]
17+
servers: ServerDescription[],
18+
deprioritized?: ServerDescription[]
1819
) => ServerDescription[];
1920

2021
/**
@@ -266,7 +267,8 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
266267

267268
return (
268269
topologyDescription: TopologyDescription,
269-
servers: ServerDescription[]
270+
servers: ServerDescription[],
271+
deprioritized: ServerDescription[] = []
270272
): ServerDescription[] => {
271273
const commonWireVersion = topologyDescription.commonWireVersion;
272274
if (
@@ -287,13 +289,15 @@ export function readPreferenceServerSelector(readPreference: ReadPreference): Se
287289
return [];
288290
}
289291

290-
if (
291-
topologyDescription.type === TopologyType.Single ||
292-
topologyDescription.type === TopologyType.Sharded
293-
) {
292+
if (topologyDescription.type === TopologyType.Single) {
294293
return latencyWindowReducer(topologyDescription, servers.filter(knownFilter));
295294
}
296295

296+
if (topologyDescription.type === TopologyType.Sharded) {
297+
const selectable = servers.length > 0 ? servers : deprioritized;
298+
return latencyWindowReducer(topologyDescription, selectable.filter(knownFilter));
299+
}
300+
297301
const mode = readPreference.mode;
298302
if (mode === ReadPreference.PRIMARY) {
299303
return servers.filter(primaryFilter);

Diff for: src/sdam/topology.ts

+10-2
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ export interface ServerSelectionRequest {
110110
timeoutController: TimeoutController;
111111
operationName: string;
112112
waitingLogged: boolean;
113+
previousServer?: ServerDescription;
113114
}
114115

115116
/** @internal */
@@ -175,6 +176,7 @@ export interface SelectServerOptions {
175176
serverSelectionTimeoutMS?: number;
176177
session?: ClientSession;
177178
operationName: string;
179+
previousServer?: ServerDescription;
178180
}
179181

180182
/** @public */
@@ -598,7 +600,8 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
598600
timeoutController: new TimeoutController(options.serverSelectionTimeoutMS),
599601
startTime: now(),
600602
operationName: options.operationName,
601-
waitingLogged: false
603+
waitingLogged: false,
604+
previousServer: options.previousServer
602605
};
603606

604607
waitQueueMember.timeoutController.signal.addEventListener('abort', () => {
@@ -930,8 +933,13 @@ function processWaitQueue(topology: Topology) {
930933
let selectedDescriptions;
931934
try {
932935
const serverSelector = waitQueueMember.serverSelector;
936+
const previousServer = waitQueueMember.previousServer;
933937
selectedDescriptions = serverSelector
934-
? serverSelector(topology.description, serverDescriptions)
938+
? serverSelector(
939+
topology.description,
940+
serverDescriptions,
941+
previousServer ? [previousServer] : []
942+
)
935943
: serverDescriptions;
936944
} catch (e) {
937945
waitQueueMember.timeoutController.clear();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import { expect } from 'chai';
2+
3+
import type { CommandFailedEvent, CommandSucceededEvent } from '../../mongodb';
4+
5+
const TEST_METADATA = { requires: { mongodb: '>=4.2.9', topology: 'sharded' } };
6+
const FAIL_COMMAND = {
7+
configureFailPoint: 'failCommand',
8+
mode: { times: 1 },
9+
data: {
10+
failCommands: ['find'],
11+
errorCode: 6,
12+
closeConnection: true
13+
}
14+
};
15+
const DISABLE_FAIL_COMMAND = {
16+
configureFailPoint: 'failCommand',
17+
mode: 'off',
18+
data: {
19+
failCommands: ['find'],
20+
errorCode: 6,
21+
closeConnection: true
22+
}
23+
};
24+
25+
describe('Server Selection Sharded Retryable Reads Prose tests', function () {
26+
context('Retryable Reads Are Retried on a Different mongos if One is Available', function () {
27+
const commandFailedEvents: CommandFailedEvent[] = [];
28+
let client;
29+
let utilClientOne;
30+
let utilClientTwo;
31+
32+
// This test MUST be executed against a sharded cluster that has at least two
33+
// mongos instances.
34+
// 1. Ensure that a test is run against a sharded cluster that has at least two
35+
// mongoses. If there are more than two mongoses in the cluster, pick two to
36+
// test against.
37+
beforeEach(async function () {
38+
const uri = this.configuration.url({
39+
monitorCommands: true,
40+
useMultipleMongoses: true
41+
});
42+
console.log('uri', uri);
43+
44+
// 3. Create a client with ``retryReads=true`` that connects to the cluster,
45+
// providing the two selected mongoses as seeds.
46+
client = this.configuration.newClient(uri, {
47+
monitorCommands: true,
48+
retryReads: true
49+
});
50+
client.on('commandFailed', event => {
51+
commandFailedEvents.push(event);
52+
});
53+
await client.connect();
54+
const seeds = client.topology.s.seedlist.map(address => address.toString());
55+
console.log('seeds', seeds);
56+
57+
// 2. Create a client per mongos using the direct connection, and configure the
58+
// following fail points on each mongos::
59+
// {
60+
// configureFailPoint: "failCommand",
61+
// mode: { times: 1 },
62+
// data: {
63+
// failCommands: ["find"],
64+
// errorCode: 6,
65+
// closeConnection: true
66+
// }
67+
// }
68+
utilClientOne = this.configuration.newClient(`mongodb://${seeds[0]}`, {
69+
directConnection: true
70+
});
71+
utilClientTwo = this.configuration.newClient(`mongodb://${seeds[1]}`, {
72+
directConnection: true
73+
});
74+
await utilClientOne.db('admin').command(FAIL_COMMAND);
75+
await utilClientTwo.db('admin').command(FAIL_COMMAND);
76+
});
77+
78+
afterEach(async function () {
79+
await client?.close();
80+
await utilClientOne.db('admin').command(DISABLE_FAIL_COMMAND);
81+
await utilClientTwo.db('admin').command(DISABLE_FAIL_COMMAND);
82+
await utilClientOne?.close();
83+
await utilClientTwo?.close();
84+
});
85+
86+
// 4. Enable command monitoring, and execute a ``find`` command that is
87+
// supposed to fail on both mongoses.
88+
// 5. Asserts that there were failed command events from each mongos.
89+
// 6. Disable the fail points.
90+
it('retries on a different mongos', TEST_METADATA, async function () {
91+
await client.db('test').collection('test').find().toArray().catch(error => null);
92+
console.log(commandFailedEvents);
93+
expect(commandFailedEvents[0]).to.not.deep.equal(commandFailedEvents[1]);
94+
});
95+
});
96+
97+
// 1. Ensure that a test is run against a sharded cluster. If there are multiple
98+
// mongoses in the cluster, pick one to test against.
99+
context('Retryable Reads Are Retried on the Same mongos if No Others are Available', function () {
100+
const commandFailedEvents: CommandFailedEvent[] = [];
101+
const commandSucceededEvents: CommandSucceededEvent[] = [];
102+
let client;
103+
let utilClient;
104+
105+
beforeEach(async function () {
106+
const uri = this.configuration.url({
107+
monitorCommands: true
108+
});
109+
console.log('uri', uri);
110+
111+
// 3. Create a client with ``retryReads=true`` that connects to the cluster,
112+
// providing the selected mongos as the seed.
113+
client = this.configuration.newClient(uri, {
114+
monitorCommands: true,
115+
retryReads: true
116+
});
117+
client.on('commandFailed', event => {
118+
commandFailedEvents.push(event);
119+
});
120+
client.on('commandSucceeded', event => {
121+
commandSucceededEvents.push(event);
122+
});
123+
124+
// 2. Create a client that connects to the mongos using the direct connection,
125+
// and configure the following fail point on the mongos::
126+
// {
127+
// configureFailPoint: "failCommand",
128+
// mode: { times: 1 },
129+
// data: {
130+
// failCommands: ["find"],
131+
// errorCode: 6,
132+
// closeConnection: true
133+
// }
134+
// }
135+
utilClient = this.configuration.newClient(uri, {
136+
directConnection: true
137+
});
138+
await utilClient.db('admin').command(FAIL_COMMAND);
139+
});
140+
141+
afterEach(async function () {
142+
await client?.close();
143+
await utilClient?.db('admin').command(DISABLE_FAIL_COMMAND);
144+
await utilClient?.close();
145+
});
146+
147+
// 4. Enable command monitoring, and execute a ``find`` command.
148+
// 5. Asserts that there was a failed command and a successful command event.
149+
// 6. Disable the fail point.
150+
it('retries on the same mongos', TEST_METADATA, async function () {
151+
await client.db('test').collection('test').find().toArray().catch(error => null);
152+
console.log(commandFailedEvents);
153+
console.log(commandSucceededEvents);
154+
});
155+
});
156+
});

0 commit comments

Comments
 (0)