forked from mongodb/mongo-java-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTimeoutContext.java
439 lines (381 loc) · 15.8 KB
/
TimeoutContext.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.internal;
import com.mongodb.MongoClientException;
import com.mongodb.MongoOperationTimeoutException;
import com.mongodb.internal.async.AsyncRunnable;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.CommandMessage;
import com.mongodb.internal.time.StartTime;
import com.mongodb.internal.time.Timeout;
import com.mongodb.lang.Nullable;
import com.mongodb.session.ClientSession;
import java.util.Objects;
import java.util.Optional;
import java.util.function.LongConsumer;
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE;
import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* Timeout Context.
*
* <p>The context for handling timeouts in relation to the Client Side Operation Timeout specification.</p>
*/
public class TimeoutContext {
private final boolean isMaintenanceContext;
private final TimeoutSettings timeoutSettings;
@Nullable
private Timeout timeout;
@Nullable
private Timeout computedServerSelectionTimeout;
private long minRoundTripTimeMS = 0;
@Nullable
private MaxTimeSupplier maxTimeSupplier = null;
public static MongoOperationTimeoutException createMongoRoundTripTimeoutException() {
return createMongoTimeoutException("Remaining timeoutMS is less than or equal to the server's minimum round trip time.");
}
public static MongoOperationTimeoutException createMongoTimeoutException(final String message) {
return new MongoOperationTimeoutException(message);
}
public static <T> T throwMongoTimeoutException(final String message) {
throw new MongoOperationTimeoutException(message);
}
public static MongoOperationTimeoutException createMongoTimeoutException(final Throwable cause) {
return createMongoTimeoutException("Operation exceeded the timeout limit: " + cause.getMessage(), cause);
}
public static MongoOperationTimeoutException createMongoTimeoutException(final String message, final Throwable cause) {
if (cause instanceof MongoOperationTimeoutException) {
return (MongoOperationTimeoutException) cause;
}
return new MongoOperationTimeoutException(message, cause);
}
public static TimeoutContext createMaintenanceTimeoutContext(final TimeoutSettings timeoutSettings) {
return new TimeoutContext(true, timeoutSettings, startTimeout(timeoutSettings.getTimeoutMS()));
}
public static TimeoutContext createTimeoutContext(final ClientSession session, final TimeoutSettings timeoutSettings) {
TimeoutContext sessionTimeoutContext = session.getTimeoutContext();
if (sessionTimeoutContext != null) {
TimeoutSettings sessionTimeoutSettings = sessionTimeoutContext.timeoutSettings;
if (timeoutSettings.getGenerationId() > sessionTimeoutSettings.getGenerationId()) {
throw new MongoClientException("Cannot change the timeoutMS during a transaction.");
}
// Check for any legacy operation timeouts
if (sessionTimeoutSettings.getTimeoutMS() == null) {
if (timeoutSettings.getMaxTimeMS() != 0) {
sessionTimeoutSettings = sessionTimeoutSettings.withMaxTimeMS(timeoutSettings.getMaxTimeMS());
}
if (timeoutSettings.getMaxAwaitTimeMS() != 0) {
sessionTimeoutSettings = sessionTimeoutSettings.withMaxAwaitTimeMS(timeoutSettings.getMaxAwaitTimeMS());
}
if (timeoutSettings.getMaxCommitTimeMS() != null) {
sessionTimeoutSettings = sessionTimeoutSettings.withMaxCommitMS(timeoutSettings.getMaxCommitTimeMS());
}
return new TimeoutContext(sessionTimeoutSettings);
}
return sessionTimeoutContext;
}
return new TimeoutContext(timeoutSettings);
}
// Creates a copy of the timeout context that can be reset without resetting the original.
public TimeoutContext copyTimeoutContext() {
return new TimeoutContext(getTimeoutSettings(), getTimeout());
}
public TimeoutContext(final TimeoutSettings timeoutSettings) {
this(false, timeoutSettings, startTimeout(timeoutSettings.getTimeoutMS()));
}
private TimeoutContext(final TimeoutSettings timeoutSettings, @Nullable final Timeout timeout) {
this(false, timeoutSettings, timeout);
}
private TimeoutContext(final boolean isMaintenanceContext, final TimeoutSettings timeoutSettings, @Nullable final Timeout timeout) {
this.isMaintenanceContext = isMaintenanceContext;
this.timeoutSettings = timeoutSettings;
this.timeout = timeout;
}
/**
* Allows for the differentiation between users explicitly setting a global operation timeout via {@code timeoutMS}.
*
* @return true if a timeout has been set.
*/
public boolean hasTimeoutMS() {
return timeoutSettings.getTimeoutMS() != null;
}
/**
* Runs the runnable if the timeout is expired.
* @param onExpired the runnable to run
*/
public void onExpired(final Runnable onExpired) {
Timeout.nullAsInfinite(timeout).onExpired(onExpired);
}
/**
* Sets the recent min round trip time
* @param minRoundTripTimeMS the min round trip time
* @return this
*/
public TimeoutContext minRoundTripTimeMS(final long minRoundTripTimeMS) {
isTrue("'minRoundTripTimeMS' must be a positive number", minRoundTripTimeMS >= 0);
this.minRoundTripTimeMS = minRoundTripTimeMS;
return this;
}
@Nullable
public Timeout timeoutIncludingRoundTrip() {
return timeout == null ? null : timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS);
}
/**
* Returns the remaining {@code timeoutMS} if set or the {@code alternativeTimeoutMS}.
*
* @param alternativeTimeoutMS the alternative timeout.
* @return timeout to use.
*/
public long timeoutOrAlternative(final long alternativeTimeoutMS) {
if (timeout == null) {
return alternativeTimeoutMS;
} else {
return timeout.call(MILLISECONDS,
() -> 0L,
(ms) -> ms,
() -> throwMongoTimeoutException("The operation exceeded the timeout limit."));
}
}
public TimeoutSettings getTimeoutSettings() {
return timeoutSettings;
}
public long getMaxAwaitTimeMS() {
return timeoutSettings.getMaxAwaitTimeMS();
}
public void runMaxTimeMS(final LongConsumer onRemaining) {
if (maxTimeSupplier != null) {
runWithFixedTimeout(maxTimeSupplier.get(), onRemaining);
return;
}
if (timeout == null) {
runWithFixedTimeout(timeoutSettings.getMaxTimeMS(), onRemaining);
return;
}
assertNotNull(timeoutIncludingRoundTrip())
.run(MILLISECONDS,
() -> {},
onRemaining,
() -> {
throw createMongoRoundTripTimeoutException();
});
}
private static void runWithFixedTimeout(final long ms, final LongConsumer onRemaining) {
if (ms != 0) {
onRemaining.accept(ms);
}
}
public void resetToDefaultMaxTime() {
this.maxTimeSupplier = null;
}
/**
* The override will be provided as the remaining value in
* {@link #runMaxTimeMS}, where 0 is ignored. This is useful for setting timeout
* in {@link CommandMessage} as an extra element before we send it to the server.
*
* <p>
* NOTE: Suitable for static user-defined values only (i.e MaxAwaitTimeMS),
* not for running timeouts that adjust dynamically (CSOT).
*/
public void setMaxTimeOverride(final long maxTimeMS) {
this.maxTimeSupplier = () -> maxTimeMS;
}
/**
* Disable the maxTimeMS override. This way the maxTimeMS will not
* be appended to the command in the {@link CommandMessage}.
*/
public void disableMaxTimeOverride() {
this.maxTimeSupplier = () -> 0;
}
/**
* The override will be provided as the remaining value in
* {@link #runMaxTimeMS}, where 0 is ignored.
*/
public void setMaxTimeOverrideToMaxCommitTime() {
this.maxTimeSupplier = () -> getMaxCommitTimeMS();
}
@VisibleForTesting(otherwise = PRIVATE)
public long getMaxCommitTimeMS() {
Long maxCommitTimeMS = timeoutSettings.getMaxCommitTimeMS();
return timeoutOrAlternative(maxCommitTimeMS != null ? maxCommitTimeMS : 0);
}
public long getReadTimeoutMS() {
return timeoutOrAlternative(timeoutSettings.getReadTimeoutMS());
}
public long getWriteTimeoutMS() {
return timeoutOrAlternative(0);
}
public int getConnectTimeoutMs() {
final long connectTimeoutMS = getTimeoutSettings().getConnectTimeoutMS();
return Math.toIntExact(Timeout.nullAsInfinite(timeout).call(MILLISECONDS,
() -> connectTimeoutMS,
(ms) -> connectTimeoutMS == 0 ? ms : Math.min(ms, connectTimeoutMS),
() -> throwMongoTimeoutException("The operation exceeded the timeout limit.")));
}
/**
* @see #hasTimeoutMS()
* @see #doWithResetTimeout(Runnable)
* @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback)
*/
public void resetTimeoutIfPresent() {
getAndResetTimeoutIfPresent();
}
/**
* @see #hasTimeoutMS()
* @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()},
* i.e., iff it was reset.
*/
private Optional<Timeout> getAndResetTimeoutIfPresent() {
Timeout result = timeout;
if (hasTimeoutMS()) {
timeout = startTimeout(timeoutSettings.getTimeoutMS());
return ofNullable(result);
}
return empty();
}
/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final Runnable action) {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
try {
action.run();
} finally {
originalTimeout.ifPresent(original -> timeout = original);
}
}
/**
* @see #resetTimeoutIfPresent()
*/
public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback<Void> callback) {
beginAsync().thenRun(c -> {
Optional<Timeout> originalTimeout = getAndResetTimeoutIfPresent();
beginAsync().thenRun(c2 -> {
action.finish(c2);
}).thenAlwaysRunAndFinish(() -> {
originalTimeout.ifPresent(original -> timeout = original);
}, c);
}).finish(callback);
}
/**
* Resets the timeout if this timeout context is being used by pool maintenance
*/
public void resetMaintenanceTimeout() {
if (!isMaintenanceContext) {
return;
}
timeout = Timeout.nullAsInfinite(timeout).call(NANOSECONDS,
() -> timeout,
(ms) -> startTimeout(timeoutSettings.getTimeoutMS()),
() -> startTimeout(timeoutSettings.getTimeoutMS()));
}
public TimeoutContext withAdditionalReadTimeout(final int additionalReadTimeout) {
// Only used outside timeoutMS usage
assertNull(timeout);
// Check existing read timeout is infinite
if (timeoutSettings.getReadTimeoutMS() == 0) {
return this;
}
long newReadTimeout = getReadTimeoutMS() + additionalReadTimeout;
return new TimeoutContext(timeoutSettings.withReadTimeoutMS(newReadTimeout > 0 ? newReadTimeout : Long.MAX_VALUE));
}
@Override
public String toString() {
return "TimeoutContext{"
+ "isMaintenanceContext=" + isMaintenanceContext
+ ", timeoutSettings=" + timeoutSettings
+ ", timeout=" + timeout
+ ", minRoundTripTimeMS=" + minRoundTripTimeMS
+ '}';
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TimeoutContext that = (TimeoutContext) o;
return isMaintenanceContext == that.isMaintenanceContext
&& minRoundTripTimeMS == that.minRoundTripTimeMS
&& Objects.equals(timeoutSettings, that.timeoutSettings)
&& Objects.equals(timeout, that.timeout);
}
@Override
public int hashCode() {
return Objects.hash(isMaintenanceContext, timeoutSettings, timeout, minRoundTripTimeMS);
}
@Nullable
public static Timeout startTimeout(@Nullable final Long timeoutMS) {
if (timeoutMS != null) {
return Timeout.expiresIn(timeoutMS, MILLISECONDS, ZERO_DURATION_MEANS_INFINITE);
}
return null;
}
/**
* Returns the computed server selection timeout
*
* <p>Caches the computed server selection timeout if:
* <ul>
* <li>not in a maintenance context</li>
* <li>there is a timeoutMS, so to keep the same legacy behavior.</li>
* <li>the server selection timeout is less than the remaining overall timeout.</li>
* </ul>
*
* @return the timeout context
*/
public Timeout computeServerSelectionTimeout() {
Timeout serverSelectionTimeout = StartTime.now()
.timeoutAfterOrInfiniteIfNegative(getTimeoutSettings().getServerSelectionTimeoutMS(), MILLISECONDS);
if (isMaintenanceContext || !hasTimeoutMS()) {
return serverSelectionTimeout;
}
if (timeout != null && Timeout.earliest(serverSelectionTimeout, timeout) == timeout) {
return timeout;
}
computedServerSelectionTimeout = serverSelectionTimeout;
return computedServerSelectionTimeout;
}
/**
* Returns the timeout context to use for the handshake process
*
* @return a new timeout context with the cached computed server selection timeout if available or this
*/
public TimeoutContext withComputedServerSelectionTimeoutContext() {
if (this.hasTimeoutMS() && computedServerSelectionTimeout != null) {
return new TimeoutContext(false, timeoutSettings, computedServerSelectionTimeout);
}
return this;
}
public Timeout startWaitQueueTimeout(final StartTime checkoutStart) {
final long ms = getTimeoutSettings().getMaxWaitTimeMS();
return checkoutStart.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS);
}
@Nullable
public Timeout getTimeout() {
return timeout;
}
public interface MaxTimeSupplier {
long get();
}
}