@@ -24,7 +24,7 @@ import {
24
24
} from '../sdam/server_selection' ;
25
25
import type { Topology } from '../sdam/topology' ;
26
26
import type { ClientSession } from '../sessions' ;
27
- import { squashError , supportsRetryableWrites } from '../utils' ;
27
+ import { supportsRetryableWrites } from '../utils' ;
28
28
import { AbstractOperation , Aspect } from './operation' ;
29
29
30
30
const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES . IllegalOperation ;
@@ -45,10 +45,9 @@ type ResultTypeFromOperation<TOperation> = TOperation extends AbstractOperation<
45
45
* not provided.
46
46
*
47
47
* The expectation is that this function:
48
- * - Connects the MongoClient if it has not already been connected
48
+ * - Connects the MongoClient if it has not already been connected, see { @link autoConnect}
49
49
* - Creates a session if none is provided and cleans up the session it creates
50
- * - Selects a server based on readPreference or various factors
51
- * - Retries an operation if it fails for certain errors, see {@link retryOperation}
50
+ * - Tries an operation and retries under certain conditions, see {@link tryOperation}
52
51
*
53
52
* @typeParam T - The operation's type
54
53
* @typeParam TResult - The type of the operation's result, calculated from T
@@ -65,23 +64,7 @@ export async function executeOperation<
65
64
throw new MongoRuntimeError ( 'This method requires a valid operation instance' ) ;
66
65
}
67
66
68
- if ( client . topology == null ) {
69
- // Auto connect on operation
70
- if ( client . s . hasBeenClosed ) {
71
- throw new MongoNotConnectedError ( 'Client must be connected before running operations' ) ;
72
- }
73
- client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] = true ;
74
- try {
75
- await client . connect ( ) ;
76
- } finally {
77
- delete client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] ;
78
- }
79
- }
80
-
81
- const { topology } = client ;
82
- if ( topology == null ) {
83
- throw new MongoRuntimeError ( 'client.connect did not create a topology but also did not throw' ) ;
84
- }
67
+ const topology = await autoConnect ( client ) ;
85
68
86
69
// The driver sessions spec mandates that we implicitly create sessions for operations
87
70
// that are not explicitly provided with a session.
@@ -108,7 +91,6 @@ export async function executeOperation<
108
91
const inTransaction = ! ! session ?. inTransaction ( ) ;
109
92
110
93
const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
111
- const hasWriteAspect = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
112
94
113
95
if (
114
96
inTransaction &&
@@ -124,6 +106,73 @@ export async function executeOperation<
124
106
session . unpin ( ) ;
125
107
}
126
108
109
+ try {
110
+ return await tryOperation ( operation , {
111
+ topology,
112
+ session,
113
+ readPreference
114
+ } ) ;
115
+ } finally {
116
+ if ( session ?. owner != null && session . owner === owner ) {
117
+ await session . endSession ( ) ;
118
+ }
119
+ }
120
+ }
121
+
122
+ /**
123
+ * Connects a client if it has not yet been connected
124
+ * @internal
125
+ */
126
+ async function autoConnect ( client : MongoClient ) : Promise < Topology > {
127
+ if ( client . topology == null ) {
128
+ if ( client . s . hasBeenClosed ) {
129
+ throw new MongoNotConnectedError ( 'Client must be connected before running operations' ) ;
130
+ }
131
+ client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] = true ;
132
+ try {
133
+ await client . connect ( ) ;
134
+ if ( client . topology == null ) {
135
+ throw new MongoRuntimeError (
136
+ 'client.connect did not create a topology but also did not throw'
137
+ ) ;
138
+ }
139
+ return client . topology ;
140
+ } finally {
141
+ delete client . s . options [ Symbol . for ( '@@mdb.skipPingOnConnect' ) ] ;
142
+ }
143
+ }
144
+ return client . topology ;
145
+ }
146
+
147
+ /** @internal */
148
+ type RetryOptions = {
149
+ session : ClientSession | undefined ;
150
+ readPreference : ReadPreference ;
151
+ topology : Topology ;
152
+ } ;
153
+
154
+ /**
155
+ * Executes an operation and retries as appropriate
156
+ * @internal
157
+ *
158
+ * @remarks
159
+ * Implements behaviour described in [Retryable Reads](https://github.com/mongodb/specifications/blob/master/source/retryable-reads/retryable-reads.md) and [Retryable
160
+ * Writes](https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.md) specification
161
+ *
162
+ * This function:
163
+ * - performs initial server selection
164
+ * - attempts to execute an operation
165
+ * - retries the operation if it meets the criteria for a retryable read or a retryable write
166
+ *
167
+ * @typeParam T - The operation's type
168
+ * @typeParam TResult - The type of the operation's result, calculated from T
169
+ *
170
+ * @param operation - The operation to execute
171
+ * */
172
+ async function tryOperation <
173
+ T extends AbstractOperation < TResult > ,
174
+ TResult = ResultTypeFromOperation < T >
175
+ > ( operation : T , { topology, session, readPreference } : RetryOptions ) : Promise < TResult > {
127
176
let selector : ReadPreference | ServerSelector ;
128
177
129
178
if ( operation . hasAspect ( Aspect . MUST_SELECT_SAME_SERVER ) ) {
@@ -139,30 +188,14 @@ export async function executeOperation<
139
188
selector = readPreference ;
140
189
}
141
190
142
- const server = await topology . selectServer ( selector , {
191
+ let server = await topology . selectServer ( selector , {
143
192
session,
144
193
operationName : operation . commandName
145
194
} ) ;
146
195
147
- if ( session == null ) {
148
- // No session also means it is not retryable, early exit
149
- return await operation . execute ( server , undefined ) ;
150
- }
151
-
152
- if ( ! operation . hasAspect ( Aspect . RETRYABLE ) ) {
153
- // non-retryable operation, early exit
154
- try {
155
- return await operation . execute ( server , session ) ;
156
- } finally {
157
- if ( session ?. owner != null && session . owner === owner ) {
158
- try {
159
- await session . endSession ( ) ;
160
- } catch ( error ) {
161
- squashError ( error ) ;
162
- }
163
- }
164
- }
165
- }
196
+ const hasReadAspect = operation . hasAspect ( Aspect . READ_OPERATION ) ;
197
+ const hasWriteAspect = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
198
+ const inTransaction = session ?. inTransaction ( ) ?? false ;
166
199
167
200
const willRetryRead = topology . s . options . retryReads && ! inTransaction && operation . canRetryRead ;
168
201
@@ -172,105 +205,76 @@ export async function executeOperation<
172
205
supportsRetryableWrites ( server ) &&
173
206
operation . canRetryWrite ;
174
207
175
- const willRetry = ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ;
208
+ const willRetry =
209
+ operation . hasAspect ( Aspect . RETRYABLE ) &&
210
+ session != null &&
211
+ ( ( hasReadAspect && willRetryRead ) || ( hasWriteAspect && willRetryWrite ) ) ;
176
212
177
- if ( hasWriteAspect && willRetryWrite ) {
213
+ if ( hasWriteAspect && willRetryWrite && session != null ) {
178
214
operation . options . willRetryWrite = true ;
179
215
session . incrementTransactionNumber ( ) ;
180
216
}
181
217
182
- try {
183
- return await operation . execute ( server , session ) ;
184
- } catch ( operationError ) {
185
- if ( willRetry && operationError instanceof MongoError ) {
186
- return await retryOperation ( operation , operationError , {
187
- session,
188
- topology,
189
- selector,
190
- previousServer : server . description
191
- } ) ;
192
- }
193
- throw operationError ;
194
- } finally {
195
- if ( session ?. owner != null && session . owner === owner ) {
196
- try {
197
- await session . endSession ( ) ;
198
- } catch ( error ) {
199
- squashError ( error ) ;
200
- }
201
- }
202
- }
203
- }
218
+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
219
+ const maxTries = willRetry ? 2 : 1 ;
220
+ let previousOperationError : MongoError | undefined ;
221
+ let previousServer : ServerDescription | undefined ;
204
222
205
- /** @internal */
206
- type RetryOptions = {
207
- session : ClientSession ;
208
- topology : Topology ;
209
- selector : ReadPreference | ServerSelector ;
210
- previousServer : ServerDescription ;
211
- } ;
223
+ // TODO(NODE-6231): implement infinite retry within CSOT timeout here
224
+ for ( let tries = 0 ; tries < maxTries ; tries ++ ) {
225
+ if ( previousOperationError ) {
226
+ if ( hasWriteAspect && previousOperationError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
227
+ throw new MongoServerError ( {
228
+ message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
229
+ errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
230
+ originalError : previousOperationError
231
+ } ) ;
232
+ }
212
233
213
- async function retryOperation <
214
- T extends AbstractOperation < TResult > ,
215
- TResult = ResultTypeFromOperation < T >
216
- > (
217
- operation : T ,
218
- originalError : MongoError ,
219
- { session, topology, selector, previousServer } : RetryOptions
220
- ) : Promise < TResult > {
221
- const isWriteOperation = operation . hasAspect ( Aspect . WRITE_OPERATION ) ;
222
- const isReadOperation = operation . hasAspect ( Aspect . READ_OPERATION ) ;
234
+ if ( hasWriteAspect && ! isRetryableWriteError ( previousOperationError ) )
235
+ throw previousOperationError ;
223
236
224
- if ( isWriteOperation && originalError . code === MMAPv1_RETRY_WRITES_ERROR_CODE ) {
225
- throw new MongoServerError ( {
226
- message : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
227
- errmsg : MMAPv1_RETRY_WRITES_ERROR_MESSAGE ,
228
- originalError
229
- } ) ;
230
- }
237
+ if ( hasReadAspect && ! isRetryableReadError ( previousOperationError ) )
238
+ throw previousOperationError ;
231
239
232
- if ( isWriteOperation && ! isRetryableWriteError ( originalError ) ) {
233
- throw originalError ;
234
- }
235
-
236
- if ( isReadOperation && ! isRetryableReadError ( originalError ) ) {
237
- throw originalError ;
238
- }
240
+ if (
241
+ previousOperationError instanceof MongoNetworkError &&
242
+ operation . hasAspect ( Aspect . CURSOR_CREATING ) &&
243
+ session != null &&
244
+ session . isPinned &&
245
+ ! session . inTransaction ( )
246
+ ) {
247
+ session . unpin ( { force : true , forceClear : true } ) ;
248
+ }
239
249
240
- if (
241
- originalError instanceof MongoNetworkError &&
242
- session . isPinned &&
243
- ! session . inTransaction ( ) &&
244
- operation . hasAspect ( Aspect . CURSOR_CREATING )
245
- ) {
246
- // If we have a cursor and the initial command fails with a network error,
247
- // we can retry it on another connection. So we need to check it back in, clear the
248
- // pool for the service id, and retry again.
249
- session . unpin ( { force : true , forceClear : true } ) ;
250
- }
250
+ server = await topology . selectServer ( selector , {
251
+ session,
252
+ operationName : operation . commandName ,
253
+ previousServer
254
+ } ) ;
251
255
252
- // select a new server, and attempt to retry the operation
253
- const server = await topology . selectServer ( selector , {
254
- session ,
255
- operationName : operation . commandName ,
256
- previousServer
257
- } ) ;
256
+ if ( hasWriteAspect && ! supportsRetryableWrites ( server ) ) {
257
+ throw new MongoUnexpectedServerResponseError (
258
+ 'Selected server does not support retryable writes'
259
+ ) ;
260
+ }
261
+ }
258
262
259
- if ( isWriteOperation && ! supportsRetryableWrites ( server ) ) {
260
- throw new MongoUnexpectedServerResponseError (
261
- 'Selected server does not support retryable writes'
262
- ) ;
263
- }
263
+ try {
264
+ return await operation . execute ( server , session ) ;
265
+ } catch ( operationError ) {
266
+ if ( ! ( operationError instanceof MongoError ) ) throw operationError ;
264
267
265
- try {
266
- return await operation . execute ( server , session ) ;
267
- } catch ( retryError ) {
268
- if (
269
- retryError instanceof MongoError &&
270
- retryError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
271
- ) {
272
- throw originalError ;
268
+ if (
269
+ previousOperationError != null &&
270
+ operationError . hasErrorLabel ( MongoErrorLabel . NoWritesPerformed )
271
+ ) {
272
+ throw previousOperationError ;
273
+ }
274
+ previousServer = server . description ;
275
+ previousOperationError = operationError ;
273
276
}
274
- throw retryError ;
275
277
}
278
+
279
+ throw previousOperationError ;
276
280
}
0 commit comments