Skip to content

Add AsyncTransportSettings, ExecutorService #1489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Nov 21, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*
*/
package com.mongodb.connection;

import com.mongodb.lang.Nullable;

import java.util.concurrent.ExecutorService;

import static com.mongodb.assertions.Assertions.notNull;

/**
* {@link TransportSettings} for a non-<a href="http://netty.io/">Netty</a>-based async transport implementation.
* Shallowly immutable.
*
* @since 5.2
*/
public final class AsyncTransportSettings extends TransportSettings {

private final ExecutorService executorService;

private AsyncTransportSettings(final Builder builder) {
this.executorService = builder.executorService;
}

static Builder builder() {
return new Builder();
}

/**
* A builder for an instance of {@link AsyncTransportSettings}
*/
public static final class Builder {

private ExecutorService executorService;

private Builder() {
}

/**
* The executor service, intended to be used exclusively by the mongo
* client. Closing the mongo client will result in {@linkplain ExecutorService#shutdown() orderly shutdown}
* of the executor service.
*
* <p>When {@linkplain SslSettings#isEnabled() TLS is not enabled}, see
* {@link java.nio.channels.AsynchronousChannelGroup#withThreadPool(ExecutorService)}
* for additional requirements for the executor service.
*
* @param executorService the executor service
* @return this
* @see #getExecutorService()
*/
public Builder executorService(final ExecutorService executorService) {
this.executorService = notNull("executorService", executorService);
return this;
}

/**
* Build an instance of {@link AsyncTransportSettings}
* @return an instance of {@link AsyncTransportSettings}
*/
public AsyncTransportSettings build() {
return new AsyncTransportSettings(this);
}
}

/**
* Gets the executor service
*
* @return the executor service
* @see Builder#executorService(ExecutorService)
*/
@Nullable
public ExecutorService getExecutorService() {
return executorService;
}

@Override
public String toString() {
return "AsyncTransportSettings{"
+ "executorService=" + executorService
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.mongodb.connection;

import com.mongodb.annotations.Immutable;
import com.mongodb.lang.Nullable;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.EventLoopGroup;
Expand All @@ -33,10 +32,10 @@

/**
* {@code TransportSettings} for a <a href="http://netty.io/">Netty</a>-based transport implementation.
* Shallowly immutable.
*
* @since 4.11
*/
@Immutable
public final class NettyTransportSettings extends TransportSettings {

private final EventLoopGroup eventLoopGroup;
Expand Down Expand Up @@ -137,7 +136,7 @@ public Builder sslContext(final SslContext sslContext) {
/**
* Build an instance of {@code NettyTransportSettings}.
*
* @return factory for {@code NettyTransportSettings}
* @return an instance of {@code NettyTransportSettings}
*/
public NettyTransportSettings build() {
return new NettyTransportSettings(this);
Expand Down
10 changes: 10 additions & 0 deletions driver-core/src/main/com/mongodb/connection/TransportSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,14 @@ public abstract class TransportSettings {
public static NettyTransportSettings.Builder nettyBuilder() {
return NettyTransportSettings.builder();
}

/**
* A builder for {@link AsyncTransportSettings}.
*
* @return a builder for {@link AsyncTransportSettings}
* @since 5.2
*/
public static AsyncTransportSettings.Builder asyncBuilder() {
return AsyncTransportSettings.builder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.SocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.LinkedList;
Expand All @@ -46,13 +47,24 @@ public final class AsynchronousSocketChannelStream extends AsynchronousChannelSt
private final ServerAddress serverAddress;
private final InetAddressResolver inetAddressResolver;
private final SocketSettings settings;
@Nullable
private final AsynchronousChannelGroup group;

public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
AsynchronousSocketChannelStream(
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider) {
this(serverAddress, inetAddressResolver, settings, bufferProvider, null);
}

public AsynchronousSocketChannelStream(
final ServerAddress serverAddress, final InetAddressResolver inetAddressResolver,
final SocketSettings settings, final PowerOfTwoBufferPool bufferProvider,
@Nullable final AsynchronousChannelGroup group) {
super(serverAddress, settings, bufferProvider);
this.serverAddress = serverAddress;
this.inetAddressResolver = inetAddressResolver;
this.settings = settings;
this.group = group;
}

@Override
Expand All @@ -77,7 +89,10 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
SocketAddress socketAddress = socketAddressQueue.poll();

try {
AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open();
AsynchronousSocketChannel attemptConnectionChannel;
attemptConnectionChannel = group == null
? AsynchronousSocketChannel.open()
: AsynchronousSocketChannel.open(group);
attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
if (settings.getReceiveBufferSize() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.mongodb.ServerAddress;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;

import java.nio.channels.AsynchronousChannelGroup;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.notNull;

Expand All @@ -31,23 +34,34 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT;
private final SocketSettings settings;
private final InetAddressResolver inetAddressResolver;
@Nullable
private final AsynchronousChannelGroup group;

/**
* Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}.
*
* @param settings the settings for the connection to a MongoDB server
* @param sslSettings the settings for connecting via SSL
*/
public AsynchronousSocketChannelStreamFactory(final InetAddressResolver inetAddressResolver, final SocketSettings settings,
public AsynchronousSocketChannelStreamFactory(
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
final SslSettings sslSettings) {
this(inetAddressResolver, settings, sslSettings, null);
}

AsynchronousSocketChannelStreamFactory(
final InetAddressResolver inetAddressResolver, final SocketSettings settings,
final SslSettings sslSettings, @Nullable final AsynchronousChannelGroup group) {
assertFalse(sslSettings.isEnabled());
this.inetAddressResolver = inetAddressResolver;
this.settings = notNull("settings", settings);
this.group = group;
}

@Override
public Stream create(final ServerAddress serverAddress) {
return new AsynchronousSocketChannelStream(serverAddress, inetAddressResolver, settings, bufferProvider);
return new AsynchronousSocketChannelStream(
serverAddress, inetAddressResolver, settings, bufferProvider, group);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,42 @@

import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.SslSettings;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.InetAddressResolver;

import java.nio.channels.AsynchronousChannelGroup;

/**
* A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams.
*
* @see java.nio.channels.AsynchronousSocketChannel
*/
public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory {
private final InetAddressResolver inetAddressResolver;
@Nullable
private final AsynchronousChannelGroup group;

public AsynchronousSocketChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
this(inetAddressResolver, null);
}

AsynchronousSocketChannelStreamFactoryFactory(
final InetAddressResolver inetAddressResolver,
@Nullable final AsynchronousChannelGroup group) {
this.inetAddressResolver = inetAddressResolver;
this.group = group;
}

@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
return new AsynchronousSocketChannelStreamFactory(inetAddressResolver, socketSettings, sslSettings);
return new AsynchronousSocketChannelStreamFactory(
inetAddressResolver, socketSettings, sslSettings, group);
}

@Override
public void close() {
if (group != null) {
group.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,72 @@
package com.mongodb.internal.connection;

import com.mongodb.MongoClientException;
import com.mongodb.MongoClientSettings;
import com.mongodb.connection.AsyncTransportSettings;
import com.mongodb.connection.NettyTransportSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.connection.TransportSettings;
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
import com.mongodb.spi.dns.InetAddressResolver;

import java.io.IOException;
import java.nio.channels.AsynchronousChannelGroup;
import java.util.concurrent.ExecutorService;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public final class StreamFactoryHelper {
public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings,

public static StreamFactory getSyncStreamFactory(final MongoClientSettings settings,
final InetAddressResolver inetAddressResolver, final SocketSettings socketSettings) {
TransportSettings transportSettings = settings.getTransportSettings();
if (transportSettings == null) {
return new SocketStreamFactory(inetAddressResolver, socketSettings, settings.getSslSettings());
} else if (transportSettings instanceof AsyncTransportSettings) {
throw new MongoClientException("Unsupported transport settings in sync: " + transportSettings.getClass().getName());
} else if (transportSettings instanceof NettyTransportSettings) {
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings)
.create(socketSettings, settings.getSslSettings());
} else {
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
}
}

public static StreamFactoryFactory getAsyncStreamFactoryFactory(final MongoClientSettings settings,
final InetAddressResolver inetAddressResolver) {
if (transportSettings instanceof NettyTransportSettings) {
return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings)
.inetAddressResolver(inetAddressResolver)
.build();
TransportSettings transportSettings = settings.getTransportSettings();
if (transportSettings == null || transportSettings instanceof AsyncTransportSettings) {
ExecutorService executorService = transportSettings == null
? null
: ((AsyncTransportSettings) transportSettings).getExecutorService();
if (settings.getSslSettings().isEnabled()) {
return new TlsChannelStreamFactoryFactory(inetAddressResolver, executorService);
}
AsynchronousChannelGroup group = null;
if (executorService != null) {
try {
group = AsynchronousChannelGroup.withThreadPool(executorService);
} catch (IOException e) {
throw new MongoClientException("Unable to create an asynchronous channel group", e);
}
}
return new AsynchronousSocketChannelStreamFactoryFactory(inetAddressResolver, group);
} else if (transportSettings instanceof NettyTransportSettings) {
return getNettyStreamFactoryFactory(inetAddressResolver, (NettyTransportSettings) transportSettings);
} else {
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
}
}

private static NettyStreamFactoryFactory getNettyStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
final NettyTransportSettings transportSettings) {
return NettyStreamFactoryFactory.builder()
.applySettings(transportSettings)
.inetAddressResolver(inetAddressResolver)
.build();
}

private StreamFactoryHelper() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.security.NoSuchAlgorithmException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand All @@ -71,13 +72,18 @@ public class TlsChannelStreamFactoryFactory implements StreamFactoryFactory {
/**
* Construct a new instance
*/
public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver,
@Nullable final ExecutorService executorService) {
this.inetAddressResolver = inetAddressResolver;
this.group = new AsynchronousTlsChannelGroup();
this.group = new AsynchronousTlsChannelGroup(executorService);
selectorMonitor = new SelectorMonitor();
selectorMonitor.start();
}

public TlsChannelStreamFactoryFactory(final InetAddressResolver inetAddressResolver) {
this(inetAddressResolver, null);
}

@Override
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
assertTrue(sslSettings.isEnabled());
Expand Down
Loading