From c4d0c489c9cfdb98812b71e5a9a73c9f812de14a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 10:43:59 +0100 Subject: [PATCH 1/8] Add log messages to recovery test --- .github/workflows/test-pr.yml | 36 ++++++------- .../stream/impl/StreamEnvironment.java | 2 +- .../stream/impl/RecoveryClusterTest.java | 54 +++++++++++++------ .../impl/StreamEnvironmentUnitTest.java | 2 +- 4 files changed, 57 insertions(+), 37 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index 380db0602c..cd6b55be74 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -22,24 +22,24 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' - - name: Start broker - run: ci/start-broker.sh - - name: Test (no dynamic-batch publishing) - run: | - ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ - -Drabbitmq.stream.producer.dynamic.batch=false \ - -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ - -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ - -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - - name: Test (dynamic-batch publishing) - run: | - ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ - -Drabbitmq.stream.producer.dynamic.batch=true \ - -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ - -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ - -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem - - name: Stop broker - run: docker stop rabbitmq && docker rm rabbitmq +# - name: Start broker +# run: ci/start-broker.sh +# - name: Test (no dynamic-batch publishing) +# run: | +# ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ +# -Drabbitmq.stream.producer.dynamic.batch=false \ +# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ +# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ +# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem +# - name: Test (dynamic-batch publishing) +# run: | +# ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ +# -Drabbitmq.stream.producer.dynamic.batch=true \ +# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ +# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ +# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem +# - name: Stop broker +# run: docker stop rabbitmq && docker rm rabbitmq - name: Start cluster run: ci/start-cluster.sh - name: Test against cluster diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index ed7f908b65..3f7642cbec 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -742,7 +742,7 @@ static T locatorOperation( Function operation, Supplier clientSupplier, BackOffDelayPolicy backOffDelayPolicy) { - int maxAttempt = 5; + int maxAttempt = 3; int attempt = 0; boolean executed = false; Exception lastException = null; diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 66f3514162..44a6a48f15 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -16,19 +16,21 @@ import static com.rabbitmq.stream.impl.Assertions.assertThat; import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS; +import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel; import static com.rabbitmq.stream.impl.TestUtils.sync; -import static io.vavr.Tuple.of; +import static com.rabbitmq.stream.impl.Tuples.pair; import static java.time.Duration.ofSeconds; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import ch.qos.logback.classic.Level; +import com.google.common.collect.Streams; import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.TestUtils.Sync; +import com.rabbitmq.stream.impl.Tuples.Pair; import io.netty.channel.EventLoopGroup; -import io.vavr.Tuple2; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.LinkedHashMap; @@ -58,13 +60,14 @@ public class RecoveryClusterTest { TestInfo testInfo; EventLoopGroup eventLoopGroup; EnvironmentBuilder environmentBuilder; - static Level producersCoordinatorLogLevel; + static List logLevels; + static List> logClasses = + List.of(ProducersCoordinator.class, ConsumersCoordinator.class, StreamEnvironment.class); @BeforeAll static void initAll() { nodes = Cli.nodes(); - producersCoordinatorLogLevel = - TestUtils.newLoggerLevel(ProducersCoordinator.class, Level.DEBUG); + logLevels = logClasses.stream().map(c -> newLoggerLevel(c, Level.DEBUG)).collect(toList()); } @BeforeEach @@ -88,15 +91,16 @@ void tearDown() { @AfterAll static void tearDownAll() { - if (producersCoordinatorLogLevel != null) { - TestUtils.newLoggerLevel(ProducersCoordinator.class, producersCoordinatorLogLevel); + if (logLevels != null) { + Streams.zip(logClasses.stream(), logLevels.stream(), Tuples::pair) + .forEach(t -> newLoggerLevel(t.v1(), t.v2())); } } @ParameterizedTest @CsvSource({ - "false,false", - "true,true", + // "false,false", + // "true,true", "true,false", }) void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws InterruptedException { @@ -167,17 +171,27 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).multipliedBy(2).toMillis()); - List> streamsSyncs = - producers.stream().map(p -> of(p.stream(), p.waitForNewMessages(1000))).collect(toList()); + List> streamsSyncs = + producers.stream() + .map(p -> pair(p.stream(), p.waitForNewMessages(1000))) + .collect(toList()); streamsSyncs.forEach( - t -> { - LOGGER.info("Checking publisher to {} still publishes", t._1()); - assertThat(t._2()).completes(); - LOGGER.info("Publisher to {} still publishes", t._1()); + p -> { + LOGGER.info("Checking publisher to {} still publishes", p.v1()); + assertThat(p.v2()).completes(); + LOGGER.info("Publisher to {} still publishes", p.v1()); }); - syncs = consumers.stream().map(c -> c.waitForNewMessages(1000)).collect(toList()); - syncs.forEach(s -> assertThat(s).completes()); + streamsSyncs = + consumers.stream() + .map(c -> pair(c.stream(), c.waitForNewMessages(1000))) + .collect(toList()); + streamsSyncs.forEach( + p -> { + LOGGER.info("Checking consumer from {} still consumes", p.v1()); + assertThat(p.v2()).completes(); + LOGGER.info("Consumer from {} still consumes", p.v1()); + }); Map committedChunkIdPerStream = new LinkedHashMap<>(streamCount); streams.forEach( @@ -271,11 +285,13 @@ public void close() { private static class ConsumerState implements AutoCloseable { + private final String stream; private final Consumer consumer; final AtomicInteger receivedCount = new AtomicInteger(); final AtomicReference postHandle = new AtomicReference<>(() -> {}); private ConsumerState(String stream, Environment environment) { + this.stream = stream; this.consumer = environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) @@ -300,6 +316,10 @@ Sync waitForNewMessages(int messageCount) { return sync; } + String stream() { + return this.stream; + } + @Override public void close() { this.consumer.close(); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index 30b3da30bb..5724e0f6f9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -245,7 +245,7 @@ void locatorOperationShouldThrowLocatorExceptionWhenRetryExhausts() { CLIENT_SUPPLIER, BackOffDelayPolicy.fixed(Duration.ofMillis(10)))) .isInstanceOf(LocatorNotAvailableException.class); - assertThat(counter).hasValue(5); + assertThat(counter).hasValue(3); } @Test From 5d1ecf2efe376afa0c1653ce26c55ac55c5b4f0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 11:15:30 +0100 Subject: [PATCH 2/8] Add locator count setting --- .../stream/impl/StreamEnvironment.java | 23 +++++++++++++++---- .../stream/impl/StreamEnvironmentBuilder.java | 9 +++++++- .../stream/impl/RecoveryClusterTest.java | 1 + .../stream/impl/StreamConsumerTest.java | 3 +-- .../impl/StreamEnvironmentUnitTest.java | 9 +++++--- 5 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 3f7642cbec..b8037eae8c 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -18,6 +18,7 @@ import static com.rabbitmq.stream.impl.Utils.*; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.Collectors.toList; import com.rabbitmq.stream.*; import com.rabbitmq.stream.MessageHandler.Context; @@ -53,6 +54,7 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.net.ssl.SSLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +83,7 @@ class StreamEnvironment implements Environment { private final ByteBufAllocator byteBufAllocator; private final AtomicBoolean locatorsInitialized = new AtomicBoolean(false); private final Runnable locatorInitializationSequence; - private final List locators = new CopyOnWriteArrayList<>(); + private final List locators; private final ExecutorServiceFactory executorServiceFactory; private final ObservationCollector observationCollector; @@ -105,7 +107,8 @@ class StreamEnvironment implements Environment { boolean forceReplicaForConsumers, boolean forceLeaderForProducers, Duration producerNodeRetryDelay, - Duration consumerNodeRetryDelay) { + Duration consumerNodeRetryDelay, + int expectedLocatorCount) { this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy; this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy; this.byteBufAllocator = byteBufAllocator; @@ -147,7 +150,7 @@ class StreamEnvironment implements Environment { new Address( uriItem.getHost() == null ? "localhost" : uriItem.getHost(), uriItem.getPort() == -1 ? defaultPort : uriItem.getPort())) - .collect(Collectors.toList()); + .collect(toList()); } AddressResolver addressResolverToUse = addressResolver; @@ -179,7 +182,19 @@ class StreamEnvironment implements Environment { this.addressResolver = addressResolverToUse; - this.addresses.forEach(address -> this.locators.add(new Locator(address))); + int locatorCount = Math.max(this.addresses.size(), expectedLocatorCount); + LOGGER.debug("Using {} locator connection(s)", locatorCount); + + List lctrs = + IntStream.range(0, locatorCount) + .mapToObj( + i -> { + Address addr = this.addresses.get(i % this.addresses.size()); + return new Locator(addr); + }) + .collect(toList()); + this.locators = List.copyOf(lctrs); + this.executorServiceFactory = new DefaultExecutorServiceFactory( this.addresses.size(), 1, "rabbitmq-stream-locator-connection-"); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index 0ec0a9dca2..f17446e985 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -70,6 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder { private ObservationCollector observationCollector = ObservationCollector.NO_OP; private Duration producerNodeRetryDelay = Duration.ofMillis(500); private Duration consumerNodeRetryDelay = Duration.ofMillis(1000); + private int locatorCount = 1; public StreamEnvironmentBuilder() {} @@ -315,6 +316,11 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay) return this; } + StreamEnvironmentBuilder locatorCount(int locatorCount) { + this.locatorCount = locatorCount; + return this; + } + @Override public Environment build() { if (this.compressionCodecFactory == null) { @@ -349,7 +355,8 @@ public Environment build() { this.forceReplicaForConsumers, this.forceLeaderForProducers, this.producerNodeRetryDelay, - this.consumerNodeRetryDelay); + this.consumerNodeRetryDelay, + this.locatorCount); } static final class DefaultTlsConfiguration implements TlsConfiguration { diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 44a6a48f15..55767f8c79 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -115,6 +115,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru .addressResolver(addr -> LOAD_BALANCER_ADDRESS); Duration nodeRetryDelay = Duration.ofMillis(100); environmentBuilder.forceLeaderForProducers(forceLeader); + ((StreamEnvironmentBuilder) environmentBuilder).locatorCount(URIS.size()); // to make the test faster ((StreamEnvironmentBuilder) environmentBuilder).producerNodeRetryDelay(nodeRetryDelay); ((StreamEnvironmentBuilder) environmentBuilder).consumerNodeRetryDelay(nodeRetryDelay); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index 1feebe6081..be2dc4a8c3 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -475,8 +475,7 @@ void manualTrackingConsumerShouldRestartWhereItLeftOff() throws Exception { @Test @DisabledIfRabbitMqCtlNotSet - void consumerShouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived() - throws Exception { + void consumerShouldReUseInitialOffsetSpecificationAfterDisruptionIfNoMessagesReceived() { int messageCountFirstWave = 10_000; Producer producer = environment.producerBuilder().stream(stream).build(); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index 5724e0f6f9..8a136fed46 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -101,7 +101,8 @@ Client.ClientParameters duplicate() { false, true, Duration.ofMillis(100), - Duration.ofMillis(100)); + Duration.ofMillis(100), + 1); } @AfterEach @@ -169,7 +170,8 @@ void shouldTryUrisOnInitializationFailure() throws Exception { false, true, Duration.ofMillis(100), - Duration.ofMillis(100)); + Duration.ofMillis(100), + 1); verify(cf, times(3)).apply(any(Client.ClientParameters.class)); } @@ -200,7 +202,8 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( false, true, Duration.ofMillis(100), - Duration.ofMillis(100)); + Duration.ofMillis(100), + 1); verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class)); } From f93c07cf6c17035a0ee88014f5dbcdd036c59072 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 11:52:54 +0100 Subject: [PATCH 3/8] Fix locator initialization --- .../com/rabbitmq/stream/impl/StreamEnvironment.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index b8037eae8c..53df31709a 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -245,8 +245,8 @@ class StreamEnvironment implements Environment { Runnable locatorInitSequence = () -> { RuntimeException lastException = null; - for (int i = 0; i < addresses.size(); i++) { - Address address = addresses.get(i); + for (int i = 0; i < locators.size(); i++) { + Address address = addresses.get(i % addresses.size()); Locator locator = locator(i); address = addressResolver.resolve(address); String connectionName = connectionNamingStrategy.apply(ClientConnectionType.LOCATOR); @@ -305,10 +305,10 @@ private ShutdownListener shutdownListener( Client.ShutdownListener shutdownListener = shutdownContext -> { if (shutdownContext.isShutdownUnexpected()) { + String label = locator.label(); locator.client(null); LOGGER.debug( - "Unexpected locator disconnection for locator on '{}', trying to reconnect", - locator.label()); + "Unexpected locator disconnection for locator on '{}', trying to reconnect", label); try { Client.ClientParameters newLocatorParameters = this.locatorParametersCopy().shutdownListener(shutdownListenerReference.get()); @@ -1006,7 +1006,9 @@ private String label() { if (c == null) { return address.host() + ":" + address.port(); } else { - return c.getHost() + ":" + c.getPort(); + return String.format( + "%s:%d [advertised %s:%d]", + c.getHost(), c.getPort(), c.serverAdvertisedHost(), c.serverAdvertisedPort()); } } From 742300108a14d3ba343bc4b6044f76c23bdeb0bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 12:03:07 +0100 Subject: [PATCH 4/8] Re-activate all recovery test cases --- .../java/com/rabbitmq/stream/impl/RecoveryClusterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 55767f8c79..b6476670fd 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -99,8 +99,8 @@ static void tearDownAll() { @ParameterizedTest @CsvSource({ - // "false,false", - // "true,true", + "false,false", + "true,true", "true,false", }) void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws InterruptedException { From e07cd048dbd356e0123afcdaad5103aa97fa50fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 13:07:27 +0100 Subject: [PATCH 5/8] Re-activate all tests --- .github/workflows/test-pr.yml | 36 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index cd6b55be74..380db0602c 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -22,24 +22,24 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' -# - name: Start broker -# run: ci/start-broker.sh -# - name: Test (no dynamic-batch publishing) -# run: | -# ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ -# -Drabbitmq.stream.producer.dynamic.batch=false \ -# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem -# - name: Test (dynamic-batch publishing) -# run: | -# ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ -# -Drabbitmq.stream.producer.dynamic.batch=true \ -# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ -# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ -# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem -# - name: Stop broker -# run: docker stop rabbitmq && docker rm rabbitmq + - name: Start broker + run: ci/start-broker.sh + - name: Test (no dynamic-batch publishing) + run: | + ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=false \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Test (dynamic-batch publishing) + run: | + ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \ + -Drabbitmq.stream.producer.dynamic.batch=true \ + -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \ + -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \ + -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem + - name: Stop broker + run: docker stop rabbitmq && docker rm rabbitmq - name: Start cluster run: ci/start-cluster.sh - name: Test against cluster From 69c14d27f3b9fd4d0ab5b7ba6b0741fbadccd2d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 16:26:49 +0100 Subject: [PATCH 6/8] Document locator connection count setting --- src/docs/asciidoc/api.adoc | 10 ++++++- .../rabbitmq/stream/EnvironmentBuilder.java | 30 +++++++++++++++++-- .../stream/impl/StreamEnvironment.java | 7 ++++- .../stream/impl/StreamEnvironmentBuilder.java | 9 +++--- .../stream/docs/EnvironmentUsage.java | 1 + .../stream/impl/RecoveryClusterTest.java | 6 ++-- 6 files changed, 51 insertions(+), 12 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index 30af7553cd..c075f91453 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -234,6 +234,10 @@ Used as a prefix for connection names. |Contract to change resolved node address to connect to. |Pass-through (no-op) +|`locatorConnectionCount` +|Number of locator connections to maintain (for metadata search) +|The smaller of the number of URIs and 3. + |`tls` |Configuration helper for TLS. |TLS is enabled if a `rabbitmq-stream+tls` URI is provided. @@ -293,8 +297,12 @@ include::{test-examples}/EnvironmentUsage.java[tag=address-resolver] <1> Set the load balancer address <2> Use load balancer address for initial connection <3> Ignore metadata hints, always use load balancer +<4> Set the number of locator connections to maintain -The blog post covers the https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams/#client-workaround-with-a-load-balancer[underlying details of this workaround]. +Note the example above sets the number of locator connections the environment maintains. +Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to). +The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment. +This is why it is recommended to set the value explicitly, 3 being a good default. ==== Managing Streams diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index 64010d4d51..c5c5d3b624 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -16,6 +16,7 @@ import com.rabbitmq.stream.compression.Compression; import com.rabbitmq.stream.compression.CompressionCodecFactory; +import com.rabbitmq.stream.impl.StreamEnvironmentBuilder; import com.rabbitmq.stream.metrics.MetricsCollector; import com.rabbitmq.stream.sasl.CredentialsProvider; import com.rabbitmq.stream.sasl.SaslConfiguration; @@ -62,14 +63,15 @@ public interface EnvironmentBuilder { * An {@link AddressResolver} to potentially change resolved node address to connect to. * *

Applications can use this abstraction to make sure connection attempts ignore metadata hints - * and always go to a single point like a load balancer. + * and always go to a single point like a load balancer. Consider setting {@link + * #locatorConnectionCount(int)} when using a load balancer. * *

The default implementation does not perform any logic, it just returns the passed-in * address. * *

The default implementation is overridden automatically if the following conditions are * met: the host to connect to is localhost, the user is guest, and no - * address resolver has been provided. The client will then always tries to connect to + * address resolver has been provided. The client will then always try to connect to * localhost to facilitate local development. Just provide a pass-through address resolver * to avoid this behavior, e.g.: * @@ -79,10 +81,11 @@ public interface EnvironmentBuilder { * .build(); * * - * @param addressResolver + * @param addressResolver the address resolver * @return this builder instance * @see "Connecting to * Streams" blog post + * @see #locatorConnectionCount(int) */ EnvironmentBuilder addressResolver(AddressResolver addressResolver); @@ -395,6 +398,27 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy( */ EnvironmentBuilder forceLeaderForProducers(boolean forceLeader); + /** + * Set the expected number of "locator" connections to maintain. + * + *

Locator connections are used to perform infrastructure-related operations (e.g. looking up + * the topology of a stream to find an appropriate node to connect to). + * + *

It is recommended to maintain 2 to 3 locator connections. The environment uses the smaller + * of the number of passed-in URIs and 3 by default (see {@link #uris(List)}). + * + *

The number of locator connections should be explicitly set when a load balancer is used, as + * the environment cannot know the number of cluster nodes in this case (the only URI set is the + * one of the load balancer). + * + * @param locatorConnectionCount number of expected locator connections + * @return this builder instance + * @see #uris(List) + * @see #addressResolver(AddressResolver) + * @since 0.21.0 + */ + StreamEnvironmentBuilder locatorConnectionCount(int locatorConnectionCount); + /** * Create the {@link Environment} instance. * diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 53df31709a..c817b1c33d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -182,7 +182,12 @@ class StreamEnvironment implements Environment { this.addressResolver = addressResolverToUse; - int locatorCount = Math.max(this.addresses.size(), expectedLocatorCount); + int locatorCount; + if (expectedLocatorCount > 0) { + locatorCount = expectedLocatorCount; + } else { + locatorCount = Math.min(this.addresses.size(), 3); + } LOGGER.debug("Using {} locator connection(s)", locatorCount); List lctrs = diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index f17446e985..0386ec79cd 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -70,7 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder { private ObservationCollector observationCollector = ObservationCollector.NO_OP; private Duration producerNodeRetryDelay = Duration.ofMillis(500); private Duration consumerNodeRetryDelay = Duration.ofMillis(1000); - private int locatorCount = 1; + private int locatorConnectionCount = -1; public StreamEnvironmentBuilder() {} @@ -316,8 +316,9 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay) return this; } - StreamEnvironmentBuilder locatorCount(int locatorCount) { - this.locatorCount = locatorCount; + @Override + public StreamEnvironmentBuilder locatorConnectionCount(int locatorCount) { + this.locatorConnectionCount = locatorCount; return this; } @@ -356,7 +357,7 @@ public Environment build() { this.forceLeaderForProducers, this.producerNodeRetryDelay, this.consumerNodeRetryDelay, - this.locatorCount); + this.locatorConnectionCount); } static final class DefaultTlsConfiguration implements TlsConfiguration { diff --git a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java index 72a32daa10..d42e6662e4 100644 --- a/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java +++ b/src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java @@ -97,6 +97,7 @@ void addressResolver() throws Exception { .host(entryPoint.host()) // <2> .port(entryPoint.port()) // <2> .addressResolver(address -> entryPoint) // <3> + .locatorConnectionCount(3) // <4> .build(); // end::address-resolver[] } diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index b6476670fd..da098188c5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -112,10 +112,10 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru environmentBuilder .host(LOAD_BALANCER_ADDRESS.host()) .port(LOAD_BALANCER_ADDRESS.port()) - .addressResolver(addr -> LOAD_BALANCER_ADDRESS); + .addressResolver(addr -> LOAD_BALANCER_ADDRESS) + .forceLeaderForProducers(forceLeader) + .locatorConnectionCount(URIS.size()); Duration nodeRetryDelay = Duration.ofMillis(100); - environmentBuilder.forceLeaderForProducers(forceLeader); - ((StreamEnvironmentBuilder) environmentBuilder).locatorCount(URIS.size()); // to make the test faster ((StreamEnvironmentBuilder) environmentBuilder).producerNodeRetryDelay(nodeRetryDelay); ((StreamEnvironmentBuilder) environmentBuilder).consumerNodeRetryDelay(nodeRetryDelay); From 3784060f349a554be977bdaaffe9e813b1d4e972 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 16:44:40 +0100 Subject: [PATCH 7/8] Fix test --- .../com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index 8a136fed46..6052e1fee8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -102,7 +102,7 @@ Client.ClientParameters duplicate() { true, Duration.ofMillis(100), Duration.ofMillis(100), - 1); + -1); } @AfterEach @@ -171,7 +171,7 @@ void shouldTryUrisOnInitializationFailure() throws Exception { true, Duration.ofMillis(100), Duration.ofMillis(100), - 1); + -1); verify(cf, times(3)).apply(any(Client.ClientParameters.class)); } @@ -203,7 +203,7 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( true, Duration.ofMillis(100), Duration.ofMillis(100), - 1); + -1); verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class)); } From 3a0056a9663a186495b47769189df7251b266d59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 29 Nov 2024 16:59:28 +0100 Subject: [PATCH 8/8] Improve deduplication Javadoc --- src/main/java/com/rabbitmq/stream/Message.java | 14 +++++++++++--- .../java/com/rabbitmq/stream/MessageBuilder.java | 8 ++++++-- .../java/com/rabbitmq/stream/ProducerBuilder.java | 10 ++++++++-- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/Message.java b/src/main/java/com/rabbitmq/stream/Message.java index 06757f4755..1c7a851b24 100644 --- a/src/main/java/com/rabbitmq/stream/Message.java +++ b/src/main/java/com/rabbitmq/stream/Message.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -32,18 +32,26 @@ public interface Message { /** * Does this message has a publishing ID? * - *

Publishing IDs are used for de-duplication of outbound messages. They are not persisted. + *

Publishing IDs are used for deduplication of outbound messages. They are not persisted. * * @return true if the message has a publishing ID, false otherwise + * @see ProducerBuilder#name(String) + * @see Deduplication + * documentation */ boolean hasPublishingId(); /** * Get the publishing ID for the message. * - *

Publishing IDs are used for de-duplication of outbound messages. They are not persisted. + *

Publishing IDs are used for deduplication of outbound messages. They are not persisted. * * @return the publishing ID of the message + * @see ProducerBuilder#name(String) + * @see Deduplication + * documentation */ long getPublishingId(); diff --git a/src/main/java/com/rabbitmq/stream/MessageBuilder.java b/src/main/java/com/rabbitmq/stream/MessageBuilder.java index 908cc78534..5528a1f1d2 100644 --- a/src/main/java/com/rabbitmq/stream/MessageBuilder.java +++ b/src/main/java/com/rabbitmq/stream/MessageBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -38,12 +38,16 @@ public interface MessageBuilder { Message build(); /** - * Set the publishing ID (for de-duplication). + * Set the publishing ID (for deduplication). * *

This is value is used only for outbound messages and is not persisted. * * @param publishingId * @return this builder instance + * @see ProducerBuilder#name(String) + * @see Deduplication + * documentation */ MessageBuilder publishingId(long publishingId); diff --git a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java index 02d6f48274..dca01baf9f 100644 --- a/src/main/java/com/rabbitmq/stream/ProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/ProducerBuilder.java @@ -23,12 +23,18 @@ public interface ProducerBuilder { /** - * The logical name of the producer. + * The producer name for deduplication (read the documentation + * before use). * - *

Set a value to enable de-duplication. + *

There must be only one producer instance at the same time using a given name. * * @param name * @return this builder instance + * @see MessageBuilder#publishingId(long) + * @see Deduplication + * documentation */ ProducerBuilder name(String name);