diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 93df2a09922..4e63db8730c 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -17,6 +17,8 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoOperationTimeoutException; +import com.mongodb.internal.async.AsyncRunnable; +import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.connection.CommandMessage; import com.mongodb.internal.time.StartTime; import com.mongodb.internal.time.Timeout; @@ -24,12 +26,16 @@ import com.mongodb.session.ClientSession; import java.util.Objects; +import java.util.Optional; import java.util.function.LongConsumer; import static com.mongodb.assertions.Assertions.assertNull; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static com.mongodb.internal.time.Timeout.ZeroSemantics.ZERO_DURATION_MEANS_INFINITE; +import static java.util.Optional.empty; +import static java.util.Optional.ofNullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -262,10 +268,53 @@ public int getConnectTimeoutMs() { () -> throwMongoTimeoutException("The operation exceeded the timeout limit."))); } + /** + * @see #hasTimeoutMS() + * @see #doWithResetTimeout(Runnable) + * @see #doWithResetTimeout(AsyncRunnable, SingleResultCallback) + */ public void resetTimeoutIfPresent() { + getAndResetTimeoutIfPresent(); + } + + /** + * @see #hasTimeoutMS() + * @return A {@linkplain Optional#isPresent() non-empty} previous {@linkplain Timeout} iff {@link #hasTimeoutMS()}, + * i.e., iff it was reset. + */ + private Optional getAndResetTimeoutIfPresent() { + Timeout result = timeout; if (hasTimeoutMS()) { timeout = startTimeout(timeoutSettings.getTimeoutMS()); + return ofNullable(result); } + return empty(); + } + + /** + * @see #resetTimeoutIfPresent() + */ + public void doWithResetTimeout(final Runnable action) { + Optional originalTimeout = getAndResetTimeoutIfPresent(); + try { + action.run(); + } finally { + originalTimeout.ifPresent(original -> timeout = original); + } + } + + /** + * @see #resetTimeoutIfPresent() + */ + public void doWithResetTimeout(final AsyncRunnable action, final SingleResultCallback callback) { + beginAsync().thenRun(c -> { + Optional originalTimeout = getAndResetTimeoutIfPresent(); + beginAsync().thenRun(c2 -> { + action.finish(c2); + }).thenAlwaysRunAndFinish(() -> { + originalTimeout.ifPresent(original -> timeout = original); + }, c); + }).finish(callback); } /** diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java index 56ca59d14ad..942721a27ad 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java @@ -17,6 +17,7 @@ package com.mongodb.internal.operation; import com.mongodb.MongoCommandException; +import com.mongodb.MongoException; import com.mongodb.MongoNamespace; import com.mongodb.MongoOperationTimeoutException; import com.mongodb.MongoSocketException; @@ -51,6 +52,7 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.doesNotThrow; +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static com.mongodb.internal.operation.CommandBatchCursorHelper.FIRST_BATCH; import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR; import static com.mongodb.internal.operation.CommandBatchCursorHelper.NEXT_BATCH; @@ -63,16 +65,18 @@ class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor { private final MongoNamespace namespace; - private final long maxTimeMS; private final Decoder decoder; @Nullable private final BsonValue comment; private final int maxWireVersion; private final boolean firstBatchEmpty; private final ResourceManager resourceManager; + private final OperationContext operationContext; + private final TimeoutMode timeoutMode; private final AtomicBoolean processedInitial = new AtomicBoolean(); private int batchSize; private volatile CommandCursorResult commandCursorResult; + private boolean resetTimeoutWhenClosing; AsyncCommandBatchCursor( final TimeoutMode timeoutMode, @@ -86,24 +90,25 @@ class AsyncCommandBatchCursor implements AsyncAggregateResponseBatchCursor this.commandCursorResult = toCommandCursorResult(connectionDescription.getServerAddress(), FIRST_BATCH, commandCursorDocument); this.namespace = commandCursorResult.getNamespace(); this.batchSize = batchSize; - this.maxTimeMS = maxTimeMS; this.decoder = decoder; this.comment = comment; this.maxWireVersion = connectionDescription.getMaxWireVersion(); this.firstBatchEmpty = commandCursorResult.getResults().isEmpty(); + operationContext = connectionSource.getOperationContext(); + this.timeoutMode = timeoutMode; - connectionSource.getOperationContext().getTimeoutContext().setMaxTimeOverride(maxTimeMS); + operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); AsyncConnection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null; - resourceManager = new ResourceManager(timeoutMode, namespace, connectionSource, connectionToPin, - commandCursorResult.getServerCursor()); + resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor()); + resetTimeoutWhenClosing = true; } @Override public void next(final SingleResultCallback> callback) { resourceManager.execute(funcCallback -> { - resourceManager.checkTimeoutModeAndResetTimeoutContextIfIteration(); + checkTimeoutModeAndResetTimeoutContextIfIteration(); ServerCursor localServerCursor = resourceManager.getServerCursor(); boolean serverCursorIsNull = localServerCursor == null; List batchResults = emptyList(); @@ -168,6 +173,12 @@ public int getMaxWireVersion() { return maxWireVersion; } + void checkTimeoutModeAndResetTimeoutContextIfIteration() { + if (timeoutMode == TimeoutMode.ITERATION) { + operationContext.getTimeoutContext().resetTimeoutIfPresent(); + } + } + private void getMore(final ServerCursor cursor, final SingleResultCallback> callback) { resourceManager.executeWithConnection((connection, wrappedCallback) -> getMoreLoop(assertNotNull(connection), cursor, wrappedCallback), callback); @@ -216,21 +227,24 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA return commandCursorResult; } - void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) { - this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset); + /** + * Configures the cursor to {@link #close()} + * without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}. + * This is useful when managing the {@link #close()} behavior externally. + */ + AsyncCommandBatchCursor disableTimeoutResetWhenClosing() { + resetTimeoutWhenClosing = false; + return this; } @ThreadSafe - private static final class ResourceManager extends CursorResourceManager { - + private final class ResourceManager extends CursorResourceManager { ResourceManager( - final TimeoutMode timeoutMode, final MongoNamespace namespace, final AsyncConnectionSource connectionSource, @Nullable final AsyncConnection connectionToPin, @Nullable final ServerCursor serverCursor) { - super(connectionSource.getOperationContext().getTimeoutContext(), timeoutMode, namespace, connectionSource, connectionToPin, - serverCursor); + super(namespace, connectionSource, connectionToPin, serverCursor); } /** @@ -244,7 +258,7 @@ void execute(final AsyncCallbackSupplier operation, final SingleResultCal } else { operation.whenComplete(() -> { endOperation(); - if (getServerCursor() == null) { + if (super.getServerCursor() == null) { // At this point all resources have been released, // but `isClose` may still be returning `false` if `close` have not been called. // Self-close to update the state managed by `ResourceManger`, and so that `isClosed` return `true`. @@ -261,23 +275,41 @@ void markAsPinned(final AsyncConnection connectionToPin, final Connection.Pinnin @Override void doClose() { - if (isSkipReleasingServerResourcesOnClose()) { - unsetServerCursor(); + TimeoutContext timeoutContext = operationContext.getTimeoutContext(); + timeoutContext.resetToDefaultMaxTime(); + SingleResultCallback thenDoNothing = (r, t) -> {}; + if (resetTimeoutWhenClosing) { + timeoutContext.doWithResetTimeout(this::releaseResourcesAsync, thenDoNothing); + } else { + releaseResourcesAsync(thenDoNothing); } + } - resetTimeout(); - if (getServerCursor() != null) { - getConnection((connection, t) -> { - if (connection != null) { - releaseServerAndClientResources(connection); - } else { - unsetServerCursor(); - releaseClientResources(); - } - }); - } else { + private void releaseResourcesAsync(final SingleResultCallback callback) { + beginAsync().thenRunTryCatchAsyncBlocks(c -> { + if (isSkipReleasingServerResourcesOnClose()) { + unsetServerCursor(); + } + if (super.getServerCursor() != null) { + beginAsync().thenSupply(c2 -> { + getConnection(c2); + }).thenConsume((connection, c3) -> { + beginAsync().thenRun(c4 -> { + releaseServerResourcesAsync(connection, c4); + }).thenAlwaysRunAndFinish(() -> { + connection.release(); + }, c3); + }).finish(c); + } else { + c.complete(c); + } + }, MongoException.class, (e, c5) -> { + c5.complete(c5); // ignore exceptions when releasing server resources + }).thenAlwaysRunAndFinish(() -> { + // guarantee that regardless of exceptions, `serverCursor` is null and client resources are released + unsetServerCursor(); releaseClientResources(); - } + }, callback); } void executeWithConnection(final AsyncCallableConnectionWithCallback callable, final SingleResultCallback callback) { @@ -314,25 +346,21 @@ private void getConnection(final SingleResultCallback callback) } } - private void releaseServerAndClientResources(final AsyncConnection connection) { - AsyncCallbackSupplier callbackSupplier = funcCallback -> { - ServerCursor localServerCursor = getServerCursor(); + private void releaseServerResourcesAsync(final AsyncConnection connection, final SingleResultCallback callback) { + beginAsync().thenRun((c) -> { + ServerCursor localServerCursor = super.getServerCursor(); if (localServerCursor != null) { - killServerCursor(getNamespace(), localServerCursor, connection, funcCallback); + killServerCursorAsync(getNamespace(), localServerCursor, connection, callback); + } else { + c.complete(c); } - }; - callbackSupplier.whenComplete(() -> { + }).thenAlwaysRunAndFinish(() -> { unsetServerCursor(); - releaseClientResources(); - }).whenComplete(connection::release).get((r, t) -> { /* do nothing */ }); + }, callback); } - private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor, + private void killServerCursorAsync(final MongoNamespace namespace, final ServerCursor localServerCursor, final AsyncConnection localConnection, final SingleResultCallback callback) { - OperationContext operationContext = assertNotNull(getConnectionSource()).getOperationContext(); - TimeoutContext timeoutContext = operationContext.getTimeoutContext(); - timeoutContext.resetToDefaultMaxTime(); - localConnection.commandAsync(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor), NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(), operationContext, (r, t) -> callback.onResult(null, null)); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 6231e98de12..4ef28c796cb 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -195,8 +195,8 @@ private AggregateOperationImpl getAggregateOperation(final Time @Override public BatchCursor execute(final ReadBinding binding) { TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); - CommandBatchCursor cursor = (CommandBatchCursor) getAggregateOperation(timeoutContext).execute(binding); - cursor.setCloseWithoutTimeoutReset(true); + CommandBatchCursor cursor = ((CommandBatchCursor) getAggregateOperation(timeoutContext).execute(binding)) + .disableTimeoutResetWhenClosing(); return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), @@ -210,8 +210,8 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb if (t != null) { callback.onResult(null, t); } else { - AsyncCommandBatchCursor cursor = (AsyncCommandBatchCursor) assertNotNull(result); - cursor.setCloseWithoutTimeoutReset(true); + AsyncCommandBatchCursor cursor = ((AsyncCommandBatchCursor) assertNotNull(result)) + .disableTimeoutResetWhenClosing(); callback.onResult(new AsyncChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, setChangeStreamOptions(cursor.getPostBatchResumeToken(), cursor.getOperationTime(), diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java index 3ac893d3178..d201976e5ed 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java @@ -67,11 +67,14 @@ class CommandBatchCursor implements AggregateResponseBatchCursor { private final int maxWireVersion; private final boolean firstBatchEmpty; private final ResourceManager resourceManager; + private final OperationContext operationContext; + private final TimeoutMode timeoutMode; private int batchSize; private CommandCursorResult commandCursorResult; @Nullable private List nextBatch; + private boolean resetTimeoutWhenClosing; CommandBatchCursor( final TimeoutMode timeoutMode, @@ -89,12 +92,14 @@ class CommandBatchCursor implements AggregateResponseBatchCursor { this.comment = comment; this.maxWireVersion = connectionDescription.getMaxWireVersion(); this.firstBatchEmpty = commandCursorResult.getResults().isEmpty(); + operationContext = connectionSource.getOperationContext(); + this.timeoutMode = timeoutMode; - connectionSource.getOperationContext().getTimeoutContext().setMaxTimeOverride(maxTimeMS); + operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS); Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null; - resourceManager = new ResourceManager(timeoutMode, namespace, connectionSource, connectionToPin, - commandCursorResult.getServerCursor()); + resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor()); + resetTimeoutWhenClosing = true; } @Override @@ -107,7 +112,7 @@ private boolean doHasNext() { return true; } - resourceManager.checkTimeoutModeAndResetTimeoutContextIfIteration(); + checkTimeoutModeAndResetTimeoutContextIfIteration(); while (resourceManager.getServerCursor() != null) { getMore(); if (!resourceManager.operable()) { @@ -228,6 +233,12 @@ public int getMaxWireVersion() { return maxWireVersion; } + void checkTimeoutModeAndResetTimeoutContextIfIteration() { + if (timeoutMode == TimeoutMode.ITERATION) { + operationContext.getTimeoutContext().resetTimeoutIfPresent(); + } + } + private void getMore() { ServerCursor serverCursor = assertNotNull(resourceManager.getServerCursor()); resourceManager.executeWithConnection(connection -> { @@ -259,26 +270,23 @@ private CommandCursorResult toCommandCursorResult(final ServerAddress serverA } /** - * Configures the cursor's behavior to close without resetting its timeout. If {@code true}, the cursor attempts to close immediately - * without resetting its {@link TimeoutContext#getTimeout()} if present. This is useful when managing the cursor's close behavior externally. - * - * @param closeWithoutTimeoutReset + * Configures the cursor to {@link #close()} + * without {@linkplain TimeoutContext#resetTimeoutIfPresent() resetting} its {@linkplain TimeoutContext#getTimeout() timeout}. + * This is useful when managing the {@link #close()} behavior externally. */ - void setCloseWithoutTimeoutReset(final boolean closeWithoutTimeoutReset) { - this.resourceManager.setCloseWithoutTimeoutReset(closeWithoutTimeoutReset); + CommandBatchCursor disableTimeoutResetWhenClosing() { + resetTimeoutWhenClosing = false; + return this; } @ThreadSafe - private static final class ResourceManager extends CursorResourceManager { - + private final class ResourceManager extends CursorResourceManager { ResourceManager( - final TimeoutMode timeoutMode, final MongoNamespace namespace, final ConnectionSource connectionSource, @Nullable final Connection connectionToPin, @Nullable final ServerCursor serverCursor) { - super(connectionSource.getOperationContext().getTimeoutContext(), timeoutMode, namespace, connectionSource, connectionToPin, - serverCursor); + super(namespace, connectionSource, connectionToPin, serverCursor); } /** @@ -306,12 +314,21 @@ void markAsPinned(final Connection connectionToPin, final Connection.PinningMode @Override void doClose() { - if (isSkipReleasingServerResourcesOnClose()) { - unsetServerCursor(); + TimeoutContext timeoutContext = operationContext.getTimeoutContext(); + timeoutContext.resetToDefaultMaxTime(); + if (resetTimeoutWhenClosing) { + timeoutContext.doWithResetTimeout(this::releaseResources); + } else { + releaseResources(); } - resetTimeout(); + } + + private void releaseResources() { try { - if (getServerCursor() != null) { + if (isSkipReleasingServerResourcesOnClose()) { + unsetServerCursor(); + } + if (super.getServerCursor() != null) { Connection connection = getConnection(); try { releaseServerResources(connection); @@ -358,7 +375,7 @@ private Connection getConnection() { private void releaseServerResources(final Connection connection) { try { - ServerCursor localServerCursor = getServerCursor(); + ServerCursor localServerCursor = super.getServerCursor(); if (localServerCursor != null) { killServerCursor(getNamespace(), localServerCursor, connection); } @@ -369,10 +386,6 @@ private void releaseServerResources(final Connection connection) { private void killServerCursor(final MongoNamespace namespace, final ServerCursor localServerCursor, final Connection localConnection) { - OperationContext operationContext = assertNotNull(getConnectionSource()).getOperationContext(); - TimeoutContext timeoutContext = operationContext.getTimeoutContext(); - timeoutContext.resetToDefaultMaxTime(); - localConnection.command(namespace.getDatabaseName(), getKillCursorsCommand(namespace, localServerCursor), NoOpFieldNameValidator.INSTANCE, ReadPreference.primary(), new BsonDocumentCodec(), operationContext); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java index 78529cfda44..0fbdf512dab 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CursorResourceManager.java @@ -20,8 +20,6 @@ import com.mongodb.MongoSocketException; import com.mongodb.ServerCursor; import com.mongodb.annotations.ThreadSafe; -import com.mongodb.client.cursor.TimeoutMode; -import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.binding.ReferenceCounted; import com.mongodb.internal.connection.Connection; import com.mongodb.lang.Nullable; @@ -56,8 +54,6 @@ @ThreadSafe abstract class CursorResourceManager { private final Lock lock; - private final TimeoutContext timeoutContext; - private final TimeoutMode timeoutMode; private final MongoNamespace namespace; private volatile State state; @Nullable @@ -67,18 +63,13 @@ abstract class CursorResourceManager(new BsonArray()))); private static final Decoder DOCUMENT_CODEC = new DocumentCodec(); + private static final Duration TIMEOUT = Duration.ofMillis(3_000); private AsyncConnection mockConnection; @@ -86,7 +96,8 @@ void setUp() { connectionSource = mock(AsyncConnectionSource.class); operationContext = mock(OperationContext.class); - timeoutContext = mock(TimeoutContext.class); + timeoutContext = new TimeoutContext(TimeoutSettings.create( + MongoClientSettings.builder().timeout(TIMEOUT.toMillis(), MILLISECONDS).build())); serverDescription = mock(ServerDescription.class); when(operationContext.getTimeoutContext()).thenReturn(timeoutContext); when(connectionSource.getOperationContext()).thenReturn(operationContext); @@ -108,7 +119,7 @@ void shouldSkipKillsCursorsCommandWhenNetworkErrorOccurs() { return null; }).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any(), any()); when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER); - AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(); + AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(0); //when commandBatchCursor.next((result, t) -> { @@ -132,9 +143,8 @@ void shouldNotSkipKillsCursorsCommandWhenTimeoutExceptionDoesNotHaveNetworkError return null; }).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any(), any()); when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER); - when(timeoutContext.hasTimeoutMS()).thenReturn(true); - AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(); + AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(0); //when commandBatchCursor.next((result, t) -> { @@ -164,9 +174,8 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() { return null; }).when(mockConnection).commandAsync(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any(), any()); when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER); - when(timeoutContext.hasTimeoutMS()).thenReturn(true); - AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(); + AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(0); //when commandBatchCursor.next((result, t) -> { @@ -186,13 +195,69 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() { argThat(bsonDocument -> bsonDocument.containsKey("killCursors")), any(), any(), any(), any(), any()); } + @Test + void closeShouldResetTimeoutContextToDefaultMaxTime() { + long maxTimeMS = 10; + com.mongodb.assertions.Assertions.assertTrue(maxTimeMS < TIMEOUT.toMillis()); + try (AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(maxTimeMS)) { + // verify that the `maxTimeMS` override was applied + timeoutContext.runMaxTimeMS(remainingMillis -> assertTrue(remainingMillis <= maxTimeMS)); + } catch (Exception e) { + throw new RuntimeException(e); + } + timeoutContext.runMaxTimeMS(remainingMillis -> { + // verify that the `maxTimeMS` override was reset + assertTrue(remainingMillis > maxTimeMS); + assertTrue(remainingMillis <= TIMEOUT.toMillis()); + }); + } - private AsyncCommandBatchCursor createBatchCursor() { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void closeShouldNotResetOriginalTimeout(final boolean disableTimeoutResetWhenClosing) { + doAnswer(invocation -> { + SingleResultCallback argument = invocation.getArgument(6); + argument.onResult(null, null); + return null; + }).when(mockConnection).commandAsync(any(), any(), any(), any(), any(), any(), any()); + Duration thirdOfTimeout = TIMEOUT.dividedBy(3); + com.mongodb.assertions.Assertions.assertTrue(thirdOfTimeout.toMillis() > 0); + try (AsyncCommandBatchCursor commandBatchCursor = createBatchCursor(0)) { + if (disableTimeoutResetWhenClosing) { + commandBatchCursor.disableTimeoutResetWhenClosing(); + } + try { + Thread.sleep(thirdOfTimeout.toMillis()); + } catch (InterruptedException e) { + throw interruptAndCreateMongoInterruptedException(null, e); + } + when(mockConnection.release()).then(invocation -> { + Thread.sleep(thirdOfTimeout.toMillis()); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + verify(mockConnection, times(1)).release(); + // at this point at least (2 * thirdOfTimeout) have passed + com.mongodb.assertions.Assertions.assertNotNull(timeoutContext.getTimeout()).run( + MILLISECONDS, + com.mongodb.assertions.Assertions::fail, + remainingMillis -> { + // Verify that the original timeout has not been intact. + // If `close` had reset it, we would have observed more than `thirdOfTimeout` left. + assertTrue(remainingMillis <= thirdOfTimeout.toMillis()); + }, + Assertions::fail); + } + + + private AsyncCommandBatchCursor createBatchCursor(final long maxTimeMS) { return new AsyncCommandBatchCursor( TimeoutMode.CURSOR_LIFETIME, COMMAND_CURSOR_DOCUMENT, 0, - 0, + maxTimeMS, DOCUMENT_CODEC, null, connectionSource, diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java index 611b90fc675..65636e2f842 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java @@ -15,6 +15,7 @@ */ package com.mongodb.internal.async; +import com.mongodb.MongoException; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.TimeoutSettings; import org.junit.jupiter.api.Test; @@ -287,6 +288,30 @@ void testConditionals() { async(3, c); }).finish(callback); }); + + // empty `else` branch + assertBehavesSameVariations(5, + () -> { + if (plainTest(1)) { + Integer connection = syncReturns(2); + sync(connection + 5); + } else { + // do nothing + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + if (plainTest(1)) { + beginAsync().thenSupply(c2 -> { + asyncReturns(2, c2); + }).thenConsume((connection, c3) -> { + async(connection + 5, c3); + }).finish(c); + } else { + c.complete(c); // do nothing + } + }).finish(callback); + }); } @Test @@ -480,6 +505,36 @@ void testTryCatch() { }); } + @Test + void testTryWithEmptyCatch() { + assertBehavesSameVariations(2, + () -> { + try { + throw new RuntimeException(); + } catch (MongoException e) { + // ignore exceptions + } finally { + plain(2); + } + plain(3); + }, + (callback) -> { + beginAsync().thenRun(c -> { + beginAsync().thenRunTryCatchAsyncBlocks(c2 -> { + c2.completeExceptionally(new RuntimeException()); + }, MongoException.class, (e, c3) -> { + c3.complete(c3); // ignore exceptions + }) + .thenAlwaysRunAndFinish(() -> { + plain(2); + }, c); + }).thenRun(c4 -> { + plain(3); + c4.complete(c4); + }).finish(callback); + }); + } + @Test void testTryCatchHelper() { assertBehavesSameVariations(4, @@ -694,7 +749,7 @@ void testRetryLoop() { } @Test - void testFinally() { + void testFinallyWithPlainInsideTry() { // (in try: normal flow + exception + exception) * (in finally: normal + exception) = 6 assertBehavesSameVariations(6, () -> { @@ -715,6 +770,29 @@ void testFinally() { }); } + @Test + void testFinallyWithPlainOutsideTry() { + assertBehavesSameVariations(5, + () -> { + plain(1); + try { + sync(2); + } finally { + plain(3); + } + }, + (callback) -> { + beginAsync().thenRun(c -> { + plain(1); + beginAsync().thenRun(c2 -> { + async(2, c2); + }).thenAlwaysRunAndFinish(() -> { + plain(3); + }, c); + }).finish(callback); + }); + } + @Test void testUsedAsLambda() { assertBehavesSameVariations(4, diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy index 4ea54c05ed0..d2bcd0804bb 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncCommandBatchCursorSpecification.groovy @@ -16,6 +16,7 @@ package com.mongodb.internal.operation +import com.mongodb.MongoClientSettings import com.mongodb.MongoCommandException import com.mongodb.MongoException import com.mongodb.MongoNamespace @@ -29,6 +30,7 @@ import com.mongodb.connection.ServerDescription import com.mongodb.connection.ServerType import com.mongodb.connection.ServerVersion import com.mongodb.internal.TimeoutContext +import com.mongodb.internal.TimeoutSettings import com.mongodb.internal.async.SingleResultCallback import com.mongodb.internal.binding.AsyncConnectionSource import com.mongodb.internal.connection.AsyncConnection @@ -42,6 +44,8 @@ import org.bson.Document import org.bson.codecs.DocumentCodec import spock.lang.Specification +import java.util.concurrent.TimeUnit + import static OperationUnitSpecification.getMaxWireVersionForServerVersion import static com.mongodb.ReadPreference.primary import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR @@ -520,7 +524,9 @@ class AsyncCommandBatchCursorSpecification extends Specification { .build() } OperationContext operationContext = Mock(OperationContext) - operationContext.getTimeoutContext() >> Mock(TimeoutContext) + def timeoutContext = Spy(new TimeoutContext(TimeoutSettings.create( + MongoClientSettings.builder().timeout(3, TimeUnit.SECONDS).build()))) + operationContext.getTimeoutContext() >> timeoutContext mock.getOperationContext() >> operationContext mock.getConnection(_) >> { if (counter == 0) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorSpecification.groovy index 72e9e135b42..c95a119134a 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorSpecification.groovy @@ -16,6 +16,7 @@ package com.mongodb.internal.operation +import com.mongodb.MongoClientSettings import com.mongodb.MongoCommandException import com.mongodb.MongoException import com.mongodb.MongoNamespace @@ -30,6 +31,7 @@ import com.mongodb.connection.ServerDescription import com.mongodb.connection.ServerType import com.mongodb.connection.ServerVersion import com.mongodb.internal.TimeoutContext +import com.mongodb.internal.TimeoutSettings import com.mongodb.internal.binding.ConnectionSource import com.mongodb.internal.connection.Connection import com.mongodb.internal.connection.OperationContext @@ -42,6 +44,8 @@ import org.bson.Document import org.bson.codecs.DocumentCodec import spock.lang.Specification +import java.util.concurrent.TimeUnit + import static com.mongodb.ReadPreference.primary import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CLOSED_AS_CURSOR import static com.mongodb.internal.operation.CommandBatchCursorHelper.MESSAGE_IF_CONCURRENT_OPERATION @@ -570,7 +574,9 @@ class CommandBatchCursorSpecification extends Specification { .build() } OperationContext operationContext = Mock(OperationContext) - operationContext.getTimeoutContext() >> Mock(TimeoutContext) + def timeoutContext = Spy(new TimeoutContext(TimeoutSettings.create( + MongoClientSettings.builder().timeout(3, TimeUnit.SECONDS).build()))) + operationContext.getTimeoutContext() >> timeoutContext mock.getOperationContext() >> operationContext mock.getConnection() >> { if (counter == 0) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorTest.java index 3380785bd70..52e88c40af0 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/CommandBatchCursorTest.java @@ -16,7 +16,7 @@ package com.mongodb.internal.operation; - +import com.mongodb.MongoClientSettings; import com.mongodb.MongoNamespace; import com.mongodb.MongoOperationTimeoutException; import com.mongodb.MongoSocketException; @@ -27,6 +27,7 @@ import com.mongodb.connection.ServerType; import com.mongodb.connection.ServerVersion; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.OperationContext; @@ -41,8 +42,16 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.time.Duration; import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -63,7 +72,7 @@ class CommandBatchCursorTest { .append("firstBatch", new BsonArrayWrapper<>(new BsonArray()))); private static final Decoder DOCUMENT_CODEC = new DocumentCodec(); - + private static final Duration TIMEOUT = Duration.ofMillis(3_000); private Connection mockConnection; private ConnectionDescription mockDescription; @@ -85,7 +94,8 @@ void setUp() { connectionSource = mock(ConnectionSource.class); operationContext = mock(OperationContext.class); - timeoutContext = mock(TimeoutContext.class); + timeoutContext = new TimeoutContext(TimeoutSettings.create( + MongoClientSettings.builder().timeout(TIMEOUT.toMillis(), MILLISECONDS).build())); serverDescription = mock(ServerDescription.class); when(operationContext.getTimeoutContext()).thenReturn(timeoutContext); when(connectionSource.getOperationContext()).thenReturn(operationContext); @@ -101,21 +111,21 @@ void shouldSkipKillsCursorsCommandWhenNetworkErrorOccurs() { new MongoSocketException("test", new ServerAddress())); when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER); - CommandBatchCursor commandBatchCursor = createBatchCursor(); + CommandBatchCursor commandBatchCursor = createBatchCursor(0); //when - Assertions.assertThrows(MongoSocketException.class, commandBatchCursor::next); + assertThrows(MongoSocketException.class, commandBatchCursor::next); //then commandBatchCursor.close(); verify(mockConnection, times(1)).command(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any()); } - private CommandBatchCursor createBatchCursor() { + private CommandBatchCursor createBatchCursor(final long maxTimeMS) { return new CommandBatchCursor<>( TimeoutMode.CURSOR_LIFETIME, COMMAND_CURSOR_DOCUMENT, 0, - 0, + maxTimeMS, DOCUMENT_CODEC, null, connectionSource, @@ -128,12 +138,11 @@ void shouldNotSkipKillsCursorsCommandWhenTimeoutExceptionDoesNotHaveNetworkError when(mockConnection.command(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any())).thenThrow( new MongoOperationTimeoutException("test")); when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER); - when(timeoutContext.hasTimeoutMS()).thenReturn(true); - CommandBatchCursor commandBatchCursor = createBatchCursor(); + CommandBatchCursor commandBatchCursor = createBatchCursor(0); //when - Assertions.assertThrows(MongoOperationTimeoutException.class, commandBatchCursor::next); + assertThrows(MongoOperationTimeoutException.class, commandBatchCursor::next); commandBatchCursor.close(); @@ -153,12 +162,11 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() { when(mockConnection.command(eq(NAMESPACE.getDatabaseName()), any(), any(), any(), any(), any())).thenThrow( new MongoOperationTimeoutException("test", new MongoSocketException("test", new ServerAddress()))); when(serverDescription.getType()).thenReturn(ServerType.LOAD_BALANCER); - when(timeoutContext.hasTimeoutMS()).thenReturn(true); - CommandBatchCursor commandBatchCursor = createBatchCursor(); + CommandBatchCursor commandBatchCursor = createBatchCursor(0); //when - Assertions.assertThrows(MongoOperationTimeoutException.class, commandBatchCursor::next); + assertThrows(MongoOperationTimeoutException.class, commandBatchCursor::next); commandBatchCursor.close(); //then @@ -169,4 +177,55 @@ void shouldSkipKillsCursorsCommandWhenTimeoutExceptionHaveNetworkErrorCause() { verify(mockConnection, never()).command(eq(NAMESPACE.getDatabaseName()), argThat(bsonDocument -> bsonDocument.containsKey("killCursors")), any(), any(), any(), any()); } + + @Test + void closeShouldResetTimeoutContextToDefaultMaxTime() { + long maxTimeMS = 10; + com.mongodb.assertions.Assertions.assertTrue(maxTimeMS < TIMEOUT.toMillis()); + try (CommandBatchCursor commandBatchCursor = createBatchCursor(maxTimeMS)) { + // verify that the `maxTimeMS` override was applied + timeoutContext.runMaxTimeMS(remainingMillis -> assertTrue(remainingMillis <= maxTimeMS)); + } catch (Exception e) { + throw new RuntimeException(e); + } + timeoutContext.runMaxTimeMS(remainingMillis -> { + // verify that the `maxTimeMS` override was reset + assertTrue(remainingMillis > maxTimeMS); + assertTrue(remainingMillis <= TIMEOUT.toMillis()); + }); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + void closeShouldNotResetOriginalTimeout(final boolean disableTimeoutResetWhenClosing) { + Duration thirdOfTimeout = TIMEOUT.dividedBy(3); + com.mongodb.assertions.Assertions.assertTrue(thirdOfTimeout.toMillis() > 0); + try (CommandBatchCursor commandBatchCursor = createBatchCursor(0)) { + if (disableTimeoutResetWhenClosing) { + commandBatchCursor.disableTimeoutResetWhenClosing(); + } + try { + Thread.sleep(thirdOfTimeout.toMillis()); + } catch (InterruptedException e) { + throw interruptAndCreateMongoInterruptedException(null, e); + } + when(mockConnection.release()).then(invocation -> { + Thread.sleep(thirdOfTimeout.toMillis()); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + verify(mockConnection, times(1)).release(); + // at this point at least (2 * thirdOfTimeout) have passed + com.mongodb.assertions.Assertions.assertNotNull(timeoutContext.getTimeout()).run( + MILLISECONDS, + com.mongodb.assertions.Assertions::fail, + remainingMillis -> { + // Verify that the original timeout has not been intact. + // If `close` had reset it, we would have observed more than `thirdOfTimeout` left. + assertTrue(remainingMillis <= thirdOfTimeout.toMillis()); + }, + Assertions::fail); + } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/CursorResourceManagerTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/CursorResourceManagerTest.java index d631daf2e21..15a8bd972f1 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/CursorResourceManagerTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/CursorResourceManagerTest.java @@ -15,10 +15,8 @@ */ package com.mongodb.internal.operation; -import com.mongodb.ClusterFixture; import com.mongodb.MongoNamespace; import com.mongodb.ServerCursor; -import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.binding.ReferenceCounted; import com.mongodb.internal.connection.Connection; @@ -32,8 +30,6 @@ final class CursorResourceManagerTest { @Test void doubleCloseExecutedConcurrentlyWithOperationBeingInProgressShouldNotFail() { CursorResourceManager cursorResourceManager = new CursorResourceManager( - ClusterFixture.OPERATION_CONTEXT.getTimeoutContext(), - TimeoutMode.CURSOR_LIFETIME, new MongoNamespace("db", "coll"), MongoMockito.mock(AsyncConnectionSource.class, mock -> { when(mock.retain()).thenReturn(mock);