diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc
index 30af7553cd..c075f91453 100644
--- a/src/docs/asciidoc/api.adoc
+++ b/src/docs/asciidoc/api.adoc
@@ -234,6 +234,10 @@ Used as a prefix for connection names.
|Contract to change resolved node address to connect to.
|Pass-through (no-op)
+|`locatorConnectionCount`
+|Number of locator connections to maintain (for metadata search)
+|The smaller of the number of URIs and 3.
+
|`tls`
|Configuration helper for TLS.
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.
@@ -293,8 +297,12 @@ include::{test-examples}/EnvironmentUsage.java[tag=address-resolver]
<1> Set the load balancer address
<2> Use load balancer address for initial connection
<3> Ignore metadata hints, always use load balancer
+<4> Set the number of locator connections to maintain
-The blog post covers the https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams/#client-workaround-with-a-load-balancer[underlying details of this workaround].
+Note the example above sets the number of locator connections the environment maintains.
+Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to).
+The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment.
+This is why it is recommended to set the value explicitly, 3 being a good default.
==== Managing Streams
diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
index 64010d4d51..c5c5d3b624 100644
--- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
@@ -16,6 +16,7 @@
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.compression.CompressionCodecFactory;
+import com.rabbitmq.stream.impl.StreamEnvironmentBuilder;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.sasl.CredentialsProvider;
import com.rabbitmq.stream.sasl.SaslConfiguration;
@@ -62,14 +63,15 @@ public interface EnvironmentBuilder {
* An {@link AddressResolver} to potentially change resolved node address to connect to.
*
*
Applications can use this abstraction to make sure connection attempts ignore metadata hints
- * and always go to a single point like a load balancer.
+ * and always go to a single point like a load balancer. Consider setting {@link
+ * #locatorConnectionCount(int)} when using a load balancer.
*
*
The default implementation does not perform any logic, it just returns the passed-in
* address.
*
*
The default implementation is overridden automatically if the following conditions are
* met: the host to connect to is localhost
, the user is guest
, and no
- * address resolver has been provided. The client will then always tries to connect to
+ * address resolver has been provided. The client will then always try to connect to
* localhost
to facilitate local development. Just provide a pass-through address resolver
* to avoid this behavior, e.g.:
*
@@ -79,10 +81,11 @@ public interface EnvironmentBuilder {
* .build();
*
*
- * @param addressResolver
+ * @param addressResolver the address resolver
* @return this builder instance
* @see "Connecting to
* Streams" blog post
+ * @see #locatorConnectionCount(int)
*/
EnvironmentBuilder addressResolver(AddressResolver addressResolver);
@@ -395,6 +398,27 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
*/
EnvironmentBuilder forceLeaderForProducers(boolean forceLeader);
+ /**
+ * Set the expected number of "locator" connections to maintain.
+ *
+ *
Locator connections are used to perform infrastructure-related operations (e.g. looking up
+ * the topology of a stream to find an appropriate node to connect to).
+ *
+ *
It is recommended to maintain 2 to 3 locator connections. The environment uses the smaller
+ * of the number of passed-in URIs and 3 by default (see {@link #uris(List)}).
+ *
+ *
The number of locator connections should be explicitly set when a load balancer is used, as
+ * the environment cannot know the number of cluster nodes in this case (the only URI set is the
+ * one of the load balancer).
+ *
+ * @param locatorConnectionCount number of expected locator connections
+ * @return this builder instance
+ * @see #uris(List)
+ * @see #addressResolver(AddressResolver)
+ * @since 0.21.0
+ */
+ StreamEnvironmentBuilder locatorConnectionCount(int locatorConnectionCount);
+
/**
* Create the {@link Environment} instance.
*
diff --git a/src/main/java/com/rabbitmq/stream/Message.java b/src/main/java/com/rabbitmq/stream/Message.java
index 06757f4755..1c7a851b24 100644
--- a/src/main/java/com/rabbitmq/stream/Message.java
+++ b/src/main/java/com/rabbitmq/stream/Message.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -32,18 +32,26 @@ public interface Message {
/**
* Does this message has a publishing ID?
*
- *
Publishing IDs are used for de-duplication of outbound messages. They are not persisted.
+ *
Publishing IDs are used for deduplication of outbound messages. They are not persisted.
*
* @return true if the message has a publishing ID, false otherwise
+ * @see ProducerBuilder#name(String)
+ * @see Deduplication
+ * documentation
*/
boolean hasPublishingId();
/**
* Get the publishing ID for the message.
*
- *
Publishing IDs are used for de-duplication of outbound messages. They are not persisted.
+ *
Publishing IDs are used for deduplication of outbound messages. They are not persisted.
*
* @return the publishing ID of the message
+ * @see ProducerBuilder#name(String)
+ * @see Deduplication
+ * documentation
*/
long getPublishingId();
diff --git a/src/main/java/com/rabbitmq/stream/MessageBuilder.java b/src/main/java/com/rabbitmq/stream/MessageBuilder.java
index 908cc78534..5528a1f1d2 100644
--- a/src/main/java/com/rabbitmq/stream/MessageBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/MessageBuilder.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
+// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
@@ -38,12 +38,16 @@ public interface MessageBuilder {
Message build();
/**
- * Set the publishing ID (for de-duplication).
+ * Set the publishing ID (for deduplication).
*
*
This is value is used only for outbound messages and is not persisted.
*
* @param publishingId
* @return this builder instance
+ * @see ProducerBuilder#name(String)
+ * @see Deduplication
+ * documentation
*/
MessageBuilder publishingId(long publishingId);
diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
index 02d6f48274..dca01baf9f 100644
--- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java
@@ -23,12 +23,18 @@
public interface ProducerBuilder {
/**
- * The logical name of the producer.
+ * The producer name for deduplication (read the documentation
+ * before use).
*
- *
Set a value to enable de-duplication.
+ *
There must be only one producer instance at the same time using a given name.
*
* @param name
* @return this builder instance
+ * @see MessageBuilder#publishingId(long)
+ * @see Deduplication
+ * documentation
*/
ProducerBuilder name(String name);
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
index ed7f908b65..c817b1c33d 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
@@ -18,6 +18,7 @@
import static com.rabbitmq.stream.impl.Utils.*;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.stream.Collectors.toList;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.MessageHandler.Context;
@@ -53,6 +54,7 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +83,7 @@ class StreamEnvironment implements Environment {
private final ByteBufAllocator byteBufAllocator;
private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false);
private final Runnable locatorInitializationSequence;
- private final List locators = new CopyOnWriteArrayList<>();
+ private final List locators;
private final ExecutorServiceFactory executorServiceFactory;
private final ObservationCollector> observationCollector;
@@ -105,7 +107,8 @@ class StreamEnvironment implements Environment {
boolean forceReplicaForConsumers,
boolean forceLeaderForProducers,
Duration producerNodeRetryDelay,
- Duration consumerNodeRetryDelay) {
+ Duration consumerNodeRetryDelay,
+ int expectedLocatorCount) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
this.byteBufAllocator = byteBufAllocator;
@@ -147,7 +150,7 @@ class StreamEnvironment implements Environment {
new Address(
uriItem.getHost() == null ? "localhost" : uriItem.getHost(),
uriItem.getPort() == -1 ? defaultPort : uriItem.getPort()))
- .collect(Collectors.toList());
+ .collect(toList());
}
AddressResolver addressResolverToUse = addressResolver;
@@ -179,7 +182,24 @@ class StreamEnvironment implements Environment {
this.addressResolver = addressResolverToUse;
- this.addresses.forEach(address -> this.locators.add(new Locator(address)));
+ int locatorCount;
+ if (expectedLocatorCount > 0) {
+ locatorCount = expectedLocatorCount;
+ } else {
+ locatorCount = Math.min(this.addresses.size(), 3);
+ }
+ LOGGER.debug("Using {} locator connection(s)", locatorCount);
+
+ List lctrs =
+ IntStream.range(0, locatorCount)
+ .mapToObj(
+ i -> {
+ Address addr = this.addresses.get(i % this.addresses.size());
+ return new Locator(addr);
+ })
+ .collect(toList());
+ this.locators = List.copyOf(lctrs);
+
this.executorServiceFactory =
new DefaultExecutorServiceFactory(
this.addresses.size(), 1, "rabbitmq-stream-locator-connection-");
@@ -230,8 +250,8 @@ class StreamEnvironment implements Environment {
Runnable locatorInitSequence =
() -> {
RuntimeException lastException = null;
- for (int i = 0; i < addresses.size(); i++) {
- Address address = addresses.get(i);
+ for (int i = 0; i < locators.size(); i++) {
+ Address address = addresses.get(i % addresses.size());
Locator locator = locator(i);
address = addressResolver.resolve(address);
String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR);
@@ -290,10 +310,10 @@ private ShutdownListener shutdownListener(
Client.ShutdownListener shutdownListener =
shutdownContext -> {
if (shutdownContext.isShutdownUnexpected()) {
+ String label = locator.label();
locator.client(null);
LOGGER.debug(
- "Unexpected locator disconnection for locator on '{}', trying to reconnect",
- locator.label());
+ "Unexpected locator disconnection for locator on '{}', trying to reconnect", label);
try {
Client.ClientParameters newLocatorParameters =
this.locatorParametersCopy().shutdownListener(shutdownListenerReference.get());
@@ -742,7 +762,7 @@ static T locatorOperation(
Function operation,
Supplier clientSupplier,
BackOffDelayPolicy backOffDelayPolicy) {
- int maxAttempt = 5;
+ int maxAttempt = 3;
int attempt = 0;
boolean executed = false;
Exception lastException = null;
@@ -991,7 +1011,9 @@ private String label() {
if (c == null) {
return address.host() + ":" + address.port();
} else {
- return c.getHost() + ":" + c.getPort();
+ return String.format(
+ "%s:%d [advertised %s:%d]",
+ c.getHost(), c.getPort(), c.serverAdvertisedHost(), c.serverAdvertisedPort());
}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
index 0ec0a9dca2..0386ec79cd 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
@@ -70,6 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private ObservationCollector> observationCollector = ObservationCollector.NO_OP;
private Duration producerNodeRetryDelay = Duration.ofMillis(500);
private Duration consumerNodeRetryDelay = Duration.ofMillis(1000);
+ private int locatorConnectionCount = -1;
public StreamEnvironmentBuilder() {}
@@ -315,6 +316,12 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay)
return this;
}
+ @Override
+ public StreamEnvironmentBuilder locatorConnectionCount(int locatorCount) {
+ this.locatorConnectionCount = locatorCount;
+ return this;
+ }
+
@Override
public Environment build() {
if (this.compressionCodecFactory == null) {
@@ -349,7 +356,8 @@ public Environment build() {
this.forceReplicaForConsumers,
this.forceLeaderForProducers,
this.producerNodeRetryDelay,
- this.consumerNodeRetryDelay);
+ this.consumerNodeRetryDelay,
+ this.locatorConnectionCount);
}
static final class DefaultTlsConfiguration implements TlsConfiguration {
diff --git a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java
index 72a32daa10..d42e6662e4 100644
--- a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java
+++ b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java
@@ -97,6 +97,7 @@ void addressResolver() throws Exception {
.host(entryPoint.host()) // <2>
.port(entryPoint.port()) // <2>
.addressResolver(address -> entryPoint) // <3>
+ .locatorConnectionCount(3) // <4>
.build();
// end::address-resolver[]
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
index 66f3514162..da098188c5 100644
--- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
@@ -16,19 +16,21 @@
import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS;
+import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel;
import static com.rabbitmq.stream.impl.TestUtils.sync;
-import static io.vavr.Tuple.of;
+import static com.rabbitmq.stream.impl.Tuples.pair;
import static java.time.Duration.ofSeconds;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
import ch.qos.logback.classic.Level;
+import com.google.common.collect.Streams;
import com.google.common.util.concurrent.RateLimiter;
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.impl.TestUtils.Sync;
+import com.rabbitmq.stream.impl.Tuples.Pair;
import io.netty.channel.EventLoopGroup;
-import io.vavr.Tuple2;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.LinkedHashMap;
@@ -58,13 +60,14 @@ public class RecoveryClusterTest {
TestInfo testInfo;
EventLoopGroup eventLoopGroup;
EnvironmentBuilder environmentBuilder;
- static Level producersCoordinatorLogLevel;
+ static List logLevels;
+ static List> logClasses =
+ List.of(ProducersCoordinator.class, ConsumersCoordinator.class, StreamEnvironment.class);
@BeforeAll
static void initAll() {
nodes = Cli.nodes();
- producersCoordinatorLogLevel =
- TestUtils.newLoggerLevel(ProducersCoordinator.class, Level.DEBUG);
+ logLevels = logClasses.stream().map(c -> newLoggerLevel(c, Level.DEBUG)).collect(toList());
}
@BeforeEach
@@ -88,8 +91,9 @@ void tearDown() {
@AfterAll
static void tearDownAll() {
- if (producersCoordinatorLogLevel != null) {
- TestUtils.newLoggerLevel(ProducersCoordinator.class, producersCoordinatorLogLevel);
+ if (logLevels != null) {
+ Streams.zip(logClasses.stream(), logLevels.stream(), Tuples::pair)
+ .forEach(t -> newLoggerLevel(t.v1(), t.v2()));
}
}
@@ -108,9 +112,10 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
environmentBuilder
.host(LOAD_BALANCER_ADDRESS.host())
.port(LOAD_BALANCER_ADDRESS.port())
- .addressResolver(addr -> LOAD_BALANCER_ADDRESS);
+ .addressResolver(addr -> LOAD_BALANCER_ADDRESS)
+ .forceLeaderForProducers(forceLeader)
+ .locatorConnectionCount(URIS.size());
Duration nodeRetryDelay = Duration.ofMillis(100);
- environmentBuilder.forceLeaderForProducers(forceLeader);
// to make the test faster
((StreamEnvironmentBuilder) environmentBuilder).producerNodeRetryDelay(nodeRetryDelay);
((StreamEnvironmentBuilder) environmentBuilder).consumerNodeRetryDelay(nodeRetryDelay);
@@ -167,17 +172,27 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).multipliedBy(2).toMillis());
- List> streamsSyncs =
- producers.stream().map(p -> of(p.stream(), p.waitForNewMessages(1000))).collect(toList());
+ List> streamsSyncs =
+ producers.stream()
+ .map(p -> pair(p.stream(), p.waitForNewMessages(1000)))
+ .collect(toList());
streamsSyncs.forEach(
- t -> {
- LOGGER.info("Checking publisher to {} still publishes", t._1());
- assertThat(t._2()).completes();
- LOGGER.info("Publisher to {} still publishes", t._1());
+ p -> {
+ LOGGER.info("Checking publisher to {} still publishes", p.v1());
+ assertThat(p.v2()).completes();
+ LOGGER.info("Publisher to {} still publishes", p.v1());
});
- syncs = consumers.stream().map(c -> c.waitForNewMessages(1000)).collect(toList());
- syncs.forEach(s -> assertThat(s).completes());
+ streamsSyncs =
+ consumers.stream()
+ .map(c -> pair(c.stream(), c.waitForNewMessages(1000)))
+ .collect(toList());
+ streamsSyncs.forEach(
+ p -> {
+ LOGGER.info("Checking consumer from {} still consumes", p.v1());
+ assertThat(p.v2()).completes();
+ LOGGER.info("Consumer from {} still consumes", p.v1());
+ });
Map committedChunkIdPerStream = new LinkedHashMap<>(streamCount);
streams.forEach(
@@ -271,11 +286,13 @@ public void close() {
private static class ConsumerState implements AutoCloseable {
+ private final String stream;
private final Consumer consumer;
final AtomicInteger receivedCount = new AtomicInteger();
final AtomicReference postHandle = new AtomicReference<>(() -> {});
private ConsumerState(String stream, Environment environment) {
+ this.stream = stream;
this.consumer =
environment.consumerBuilder().stream(stream)
.offset(OffsetSpecification.first())
@@ -300,6 +317,10 @@ Sync waitForNewMessages(int messageCount) {
return sync;
}
+ String stream() {
+ return this.stream;
+ }
+
@Override
public void close() {
this.consumer.close();
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
index 1feebe6081..be2dc4a8c3 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java
@@ -475,8 +475,7 @@ void manualTrackingConsumerShouldRestartWhereItLeftOff() throws Exception {
@Test
@DisabledIfRabbitMqCtlNotSet
- void consumerShouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived()
- throws Exception {
+ void consumerShouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived() {
int messageCountFirstWave = 10_000;
Producer producer = environment.producerBuilder().stream(stream).build();
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
index 30b3da30bb..6052e1fee8 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
@@ -101,7 +101,8 @@ Client.ClientParameters duplicate() {
false,
true,
Duration.ofMillis(100),
- Duration.ofMillis(100));
+ Duration.ofMillis(100),
+ -1);
}
@AfterEach
@@ -169,7 +170,8 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
false,
true,
Duration.ofMillis(100),
- Duration.ofMillis(100));
+ Duration.ofMillis(100),
+ -1);
verify(cf, times(3)).apply(any(Client.ClientParameters.class));
}
@@ -200,7 +202,8 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
false,
true,
Duration.ofMillis(100),
- Duration.ofMillis(100));
+ Duration.ofMillis(100),
+ -1);
verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class));
}
@@ -245,7 +248,7 @@ void locatorOperationShouldThrowLocatorExceptionWhenRetryExhausts() {
CLIENT_SUPPLIER,
BackOffDelayPolicy.fixed(Duration.ofMillis(10))))
.isInstanceOf(LocatorNotAvailableException.class);
- assertThat(counter).hasValue(5);
+ assertThat(counter).hasValue(3);
}
@Test