Skip to content

Commit d976368

Browse files
committed
Added local bind address feature
1 parent 9580fe0 commit d976368

File tree

5 files changed

+146
-17
lines changed

5 files changed

+146
-17
lines changed

src/main/java/com/hivemq/client/internal/mqtt/MqttClientTransportConfigImpl.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,20 @@ public class MqttClientTransportConfigImpl implements MqttClientTransportConfig
3434

3535
public static final @NotNull MqttClientTransportConfigImpl DEFAULT = new MqttClientTransportConfigImpl(
3636
InetSocketAddress.createUnresolved(MqttClient.DEFAULT_SERVER_HOST, MqttClient.DEFAULT_SERVER_PORT), null,
37-
null);
37+
null, null);
3838

3939
private final @NotNull InetSocketAddress serverAddress;
40+
private final @Nullable InetSocketAddress localAddress;
4041
private final @Nullable MqttClientSslConfigImpl sslConfig;
4142
private final @Nullable MqttWebSocketConfigImpl webSocketConfig;
4243

4344
MqttClientTransportConfigImpl(
44-
final @NotNull InetSocketAddress serverAddress, final @Nullable MqttClientSslConfigImpl sslConfig,
45+
final @NotNull InetSocketAddress serverAddress, final @Nullable InetSocketAddress localAddress,
46+
final @Nullable MqttClientSslConfigImpl sslConfig,
4547
final @Nullable MqttWebSocketConfigImpl webSocketConfig) {
4648

4749
this.serverAddress = serverAddress;
50+
this.localAddress = localAddress;
4851
this.sslConfig = sslConfig;
4952
this.webSocketConfig = webSocketConfig;
5053
}
@@ -54,6 +57,15 @@ public class MqttClientTransportConfigImpl implements MqttClientTransportConfig
5457
return serverAddress;
5558
}
5659

60+
@Override
61+
public @NotNull Optional<InetSocketAddress> getLocalAddress() {
62+
return Optional.ofNullable(localAddress);
63+
}
64+
65+
public @Nullable InetSocketAddress getRawLocalAddress() {
66+
return localAddress;
67+
}
68+
5769
@Override
5870
public @NotNull Optional<MqttClientSslConfig> getSslConfig() {
5971
return Optional.ofNullable(sslConfig);

src/main/java/com/hivemq/client/internal/mqtt/MqttClientTransportConfigImplBuilder.java

+68-3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public abstract class MqttClientTransportConfigImplBuilder<B extends MqttClientT
3838
private @Nullable InetSocketAddress serverAddress;
3939
private @NotNull Object serverHost = DEFAULT_SERVER_HOST; // String or InetAddress
4040
private int serverPort = -1;
41+
private @Nullable InetSocketAddress localAddress;
4142
private @Nullable MqttClientSslConfigImpl sslConfig;
4243
private @Nullable MqttWebSocketConfigImpl webSocketConfig;
4344

@@ -51,6 +52,7 @@ public abstract class MqttClientTransportConfigImplBuilder<B extends MqttClientT
5152
serverAddress = builder.serverAddress;
5253
serverHost = builder.serverHost;
5354
serverPort = builder.serverPort;
55+
localAddress = builder.localAddress;
5456
sslConfig = builder.sslConfig;
5557
webSocketConfig = builder.webSocketConfig;
5658
}
@@ -78,8 +80,8 @@ void set(final @NotNull MqttClientTransportConfigImpl transportConfig) {
7880
return self();
7981
}
8082

81-
private void setServerHost(final @NotNull Object serverHost) {
82-
this.serverHost = serverHost;
83+
private void setServerHost(final @NotNull Object host) {
84+
serverHost = host;
8385
if (serverAddress != null) {
8486
serverPort = serverAddress.getPort();
8587
serverAddress = null;
@@ -100,6 +102,69 @@ private void setServerHost(final @NotNull Object serverHost) {
100102
return self();
101103
}
102104

105+
public @NotNull B localAddress(final @Nullable InetSocketAddress address) {
106+
if (address == null) {
107+
localAddress = null;
108+
} else {
109+
localAddress = checkLocalAddress(address);
110+
}
111+
return self();
112+
}
113+
114+
public @NotNull B localAddress(final @Nullable String address) {
115+
if (address == null) {
116+
removeLocalAddress();
117+
} else {
118+
localAddress = checkLocalAddress(new InetSocketAddress(address, getLocalPort()));
119+
}
120+
return self();
121+
}
122+
123+
public @NotNull B localAddress(final @Nullable InetAddress address) {
124+
if (address == null) {
125+
removeLocalAddress();
126+
} else {
127+
localAddress = new InetSocketAddress(address, getLocalPort());
128+
}
129+
return self();
130+
}
131+
132+
private @NotNull InetSocketAddress checkLocalAddress(final @NotNull InetSocketAddress address) {
133+
if (address.isUnresolved()) {
134+
throw new IllegalArgumentException("Local bind address must not be unresolved.");
135+
}
136+
return address;
137+
}
138+
139+
private void removeLocalAddress() {
140+
if ((localAddress != null) && (localAddress.getAddress() != null)) {
141+
if (localAddress.getPort() == 0) {
142+
localAddress = null;
143+
} else {
144+
localAddress = new InetSocketAddress(localAddress.getPort());
145+
}
146+
}
147+
}
148+
149+
private int getLocalPort() {
150+
return (localAddress == null) ? 0 : localAddress.getPort();
151+
}
152+
153+
public @NotNull B localPort(final int port) {
154+
if (port == 0) {
155+
if ((localAddress != null) && (localAddress.getPort() != 0)) {
156+
if (localAddress.getAddress() == null) {
157+
localAddress = null;
158+
} else {
159+
localAddress = new InetSocketAddress(localAddress.getAddress(), 0);
160+
}
161+
}
162+
} else {
163+
localAddress = new InetSocketAddress((localAddress == null) ? null : localAddress.getAddress(), port);
164+
}
165+
return self();
166+
}
167+
103168
public @NotNull B sslWithDefaultConfig() {
104169
this.sslConfig = MqttClientSslConfigImpl.DEFAULT;
105170
return self();
@@ -156,7 +221,7 @@ private int getServerPort() {
156221
}
157222

158223
@NotNull MqttClientTransportConfigImpl buildTransportConfig() {
159-
return new MqttClientTransportConfigImpl(getServerAddress(), sslConfig, webSocketConfig);
224+
return new MqttClientTransportConfigImpl(getServerAddress(), localAddress, sslConfig, webSocketConfig);
160225
}
161226

162227
public static class Default extends MqttClientTransportConfigImplBuilder<Default>

src/main/java/com/hivemq/client/internal/mqtt/handler/connect/MqttConnAckSingle.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -84,18 +84,24 @@ private static void connect(
8484
.build()
8585
.bootstrap();
8686

87-
bootstrap.group(eventLoop).connect(flow.getTransportConfig().getServerAddress()).addListener(future -> {
88-
final Throwable cause = future.cause();
89-
if (cause != null) {
90-
final ConnectionFailedException e = new ConnectionFailedException(cause);
91-
if (eventLoop.inEventLoop()) {
92-
reconnect(clientConfig, MqttDisconnectSource.CLIENT, e, connect, flow, eventLoop);
93-
} else {
94-
eventLoop.execute(() -> reconnect(clientConfig, MqttDisconnectSource.CLIENT, e, connect, flow,
95-
eventLoop));
96-
}
97-
}
98-
});
87+
final MqttClientTransportConfigImpl transportConfig = flow.getTransportConfig();
88+
89+
bootstrap.group(eventLoop)
90+
.localAddress(transportConfig.getRawLocalAddress())
91+
.connect(transportConfig.getServerAddress())
92+
.addListener(future -> {
93+
final Throwable cause = future.cause();
94+
if (cause != null) {
95+
final ConnectionFailedException e = new ConnectionFailedException(cause);
96+
if (eventLoop.inEventLoop()) {
97+
reconnect(clientConfig, MqttDisconnectSource.CLIENT, e, connect, flow, eventLoop);
98+
} else {
99+
eventLoop.execute(
100+
() -> reconnect(clientConfig, MqttDisconnectSource.CLIENT, e, connect, flow,
101+
eventLoop));
102+
}
103+
}
104+
});
99105
}
100106
}
101107

src/main/java/com/hivemq/client/mqtt/MqttClientTransportConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,12 @@ public interface MqttClientTransportConfig {
4747
*/
4848
@NotNull InetSocketAddress getServerAddress();
4949

50+
/**
51+
* @return the optional local bind address.
52+
* @since 1.2
53+
*/
54+
@NotNull Optional<InetSocketAddress> getLocalAddress();
55+
5056
/**
5157
* @return the optional secure transport configuration.
5258
*/

src/main/java/com/hivemq/client/mqtt/MqttClientTransportConfigBuilderBase.java

+40
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,46 @@ public interface MqttClientTransportConfigBuilderBase<B extends MqttClientTransp
6565
*/
6666
@NotNull B serverPort(int port);
6767

68+
/**
69+
* Sets the optional {@link MqttClientTransportConfig#getLocalAddress() local bind address}.
70+
* <p>
71+
* The address must be resolved.
72+
*
73+
* @param address the local bind address.
74+
* @return the builder.
75+
* @since 1.2
76+
*/
77+
@NotNull B localAddress(@Nullable InetSocketAddress address);
78+
79+
/**
80+
* Sets the optional local bind address.
81+
* <p>
82+
* The address must be resolvable.
83+
*
84+
* @param address the local bind address.
85+
* @return the builder.
86+
* @since 1.2
87+
*/
88+
@NotNull B localAddress(@Nullable String address);
89+
90+
/**
91+
* Sets the optional local bind address.
92+
*
93+
* @param address the local bind address
94+
* @return the builder.
95+
* @since 1.2
96+
*/
97+
@NotNull B localAddress(@Nullable InetAddress address);
98+
99+
/**
100+
* Sets the optional local bind port.
101+
*
102+
* @param port the local bind port.
103+
* @return the builder.
104+
* @since 1.2
105+
*/
106+
@NotNull B localPort(int port);
107+
68108
/**
69109
* Sets the {@link MqttClientTransportConfig#getSslConfig() secure transport configuration} to the default
70110
* configuration.

0 commit comments

Comments
 (0)