diff --git a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
index c4d1e3f1f7..7484ddc433 100644
--- a/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
@@ -151,6 +151,13 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder noTrackingStrategy();
+ /**
+ * Configure flow of messages.
+ *
+ * @return the flow configuration
+ */
+ FlowConfiguration flow();
+
/**
* Create the configured {@link Consumer}
*
@@ -209,4 +216,25 @@ interface AutoTrackingStrategy {
*/
ConsumerBuilder builder();
}
+
+ /** Message flow configuration. */
+ interface FlowConfiguration {
+
+ /**
+ * The number of initial credits for the subscription.
+ *
+ *
Default is 1.
+ *
+ * @param initialCredits the number of initial credits
+ * @return this configuration instance
+ */
+ FlowConfiguration initialCredits(int initialCredits);
+
+ /**
+ * Go back to the builder.
+ *
+ * @return the consumer builder
+ */
+ ConsumerBuilder builder();
+ }
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index 587282e7d3..f85fa9a51a 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -1113,7 +1113,7 @@ public Response subscribe(
* @param subscriptionId identifier to correlate inbound messages to this subscription
* @param stream the stream to consume from
* @param offsetSpecification the specification of the offset to consume from
- * @param credit the initial number of credits
+ * @param initialCredits the initial number of credits
* @param properties some optional properties to describe the subscription
* @return the subscription confirmation
*/
@@ -1121,9 +1121,9 @@ public Response subscribe(
byte subscriptionId,
String stream,
OffsetSpecification offsetSpecification,
- int credit,
+ int initialCredits,
Map properties) {
- if (credit < 0 || credit > Short.MAX_VALUE) {
+ if (initialCredits < 0 || initialCredits > Short.MAX_VALUE) {
throw new IllegalArgumentException("Credit value must be between 0 and " + Short.MAX_VALUE);
}
int length = 2 + 2 + 4 + 1 + 2 + stream.length() + 2 + 2; // misses the offset
@@ -1152,7 +1152,7 @@ public Response subscribe(
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
bb.writeLong(offsetSpecification.getOffset());
}
- bb.writeShort(credit);
+ bb.writeShort(initialCredits);
if (properties != null && !properties.isEmpty()) {
bb.writeInt(properties.size());
for (Map.Entry entry : properties.entrySet()) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
index 7dd98a05fe..7d829e1f7f 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
@@ -40,10 +40,9 @@ class StreamConsumerBuilder implements ConsumerBuilder {
private boolean noTrackingStrategy = false;
private boolean lazyInit = false;
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
- private Map subscriptionProperties = new ConcurrentHashMap<>();
+ private final Map subscriptionProperties = new ConcurrentHashMap<>();
private ConsumerUpdateListener consumerUpdateListener;
- private int initialCredits = 1;
- private int additionalCredits = 1;
+ private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this);
public StreamConsumerBuilder(StreamEnvironment environment) {
this.environment = environment;
@@ -132,13 +131,9 @@ public ConsumerBuilder noTrackingStrategy() {
return this;
}
- public ConsumerBuilder credits(int initial, int onChunkDelivery) {
- if (initial <= 0 || onChunkDelivery <= 0) {
- throw new IllegalArgumentException("Credits must be positive");
- }
- this.initialCredits = initial;
- this.additionalCredits = onChunkDelivery;
- return this;
+ @Override
+ public FlowConfiguration flow() {
+ return this.flowConfiguration;
}
StreamConsumerBuilder lazyInit(boolean lazyInit) {
@@ -204,8 +199,8 @@ public Consumer build() {
this.subscriptionListener,
this.subscriptionProperties,
this.consumerUpdateListener,
- this.initialCredits,
- this.additionalCredits);
+ this.flowConfiguration.initialCredits,
+ this.flowConfiguration.additionalCredits);
environment.addConsumer((StreamConsumer) consumer);
} else {
if (Utils.isSac(this.subscriptionProperties)) {
@@ -342,6 +337,32 @@ StreamConsumerBuilder duplicate() {
return duplicate;
}
+ private static class DefaultFlowConfiguration implements FlowConfiguration {
+
+ private final ConsumerBuilder consumerBuilder;
+
+ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
+ this.consumerBuilder = consumerBuilder;
+ }
+
+ private int initialCredits = 1;
+ private final int additionalCredits = 1;
+
+ @Override
+ public FlowConfiguration initialCredits(int initialCredits) {
+ if (initialCredits <= 0) {
+ throw new IllegalArgumentException("Credits must be positive");
+ }
+ this.initialCredits = initialCredits;
+ return this;
+ }
+
+ @Override
+ public ConsumerBuilder builder() {
+ return this.consumerBuilder;
+ }
+ }
+
// to help testing
public ConsumerUpdateListener consumerUpdateListener() {
return consumerUpdateListener;
diff --git a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
index 142690cca7..73136cb0fd 100644
--- a/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
+++ b/src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
@@ -46,7 +46,6 @@
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.perf.ShutdownService.CloseCallback;
-import com.rabbitmq.stream.perf.Utils.CreditSettings;
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector;
import io.micrometer.core.instrument.Counter;
@@ -415,13 +414,6 @@ public class StreamPerfTest implements Callable {
defaultValue = "false")
private boolean metricsCommandLineArguments;
- @CommandLine.Option(
- names = {"--credits", "-cr"},
- description = "initial and additional credits for subscriptions",
- defaultValue = "1:1",
- converter = Utils.CreditsTypeConverter.class)
- private CreditSettings credits;
-
@CommandLine.Option(
names = {"--requested-max-frame-size", "-rmfs"},
description = "maximum frame size to request",
@@ -472,6 +464,13 @@ static class InstanceSyncOptions {
private String instanceSyncNamespace;
}
+ @CommandLine.Option(
+ names = {"--initial-credits", "-ic"},
+ description = "initial credits for subscription",
+ defaultValue = "1",
+ converter = Utils.NotNegativeIntegerTypeConverter.class)
+ private int initialCredits;
+
private MetricsCollector metricsCollector;
private PerformanceMetrics performanceMetrics;
private List monitorings;
@@ -994,7 +993,12 @@ public Integer call() throws Exception {
AtomicLong messageCount = new AtomicLong(0);
String stream = stream(streams, i);
ConsumerBuilder consumerBuilder =
- environment.consumerBuilder().offset(this.offset);
+ environment
+ .consumerBuilder()
+ .offset(this.offset)
+ .flow()
+ .initialCredits(this.initialCredits)
+ .builder();
if (this.superStreams) {
consumerBuilder.superStream(stream);
diff --git a/src/main/java/com/rabbitmq/stream/perf/Utils.java b/src/main/java/com/rabbitmq/stream/perf/Utils.java
index 5569bb521f..f123134cac 100644
--- a/src/main/java/com/rabbitmq/stream/perf/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/perf/Utils.java
@@ -649,65 +649,6 @@ public Integer convert(String input) {
}
}
- static class CreditsTypeConverter implements CommandLine.ITypeConverter {
-
- @Override
- public CreditSettings convert(String input) {
- String errorMessage =
- input + " is not a valid credits setting, " + "valid example values: 20:1, 15";
- if (input == null || input.trim().isEmpty()) {
- typeConversionException(errorMessage);
- }
- if (input.contains(":") || input.contains("-")) {
- String separator = input.contains(":") ? ":" : "-";
- String[] split = input.split(separator);
- if (split.length != 2) {
- typeConversionException(errorMessage);
- } else {
- int[] credits =
- Arrays.stream(split)
- .mapToInt(Integer::valueOf)
- .peek(
- c -> {
- if (c <= 0) {
- typeConversionException("credit values must be positive");
- }
- })
- .toArray();
- return new CreditSettings(credits[0], credits[1]);
- }
- }
- try {
- int value = Integer.parseInt(input);
- if (value <= 0) {
- typeConversionException("credit values must be positive");
- }
- return new CreditSettings(value, 1);
- } catch (Exception e) {
- typeConversionException(errorMessage);
- }
- return new CreditSettings(10, 1);
- }
- }
-
- static class CreditSettings {
-
- private final int initial, additional;
-
- CreditSettings(int initial, int additional) {
- this.initial = initial;
- this.additional = additional;
- }
-
- int initial() {
- return this.initial;
- }
-
- int additional() {
- return this.additional;
- }
- }
-
private static void typeConversionException(String message) {
throw new TypeConversionException(message);
}
diff --git a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
index dca7e1a22c..f3642b7586 100644
--- a/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
+++ b/src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
@@ -24,8 +24,6 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
-import com.rabbitmq.stream.perf.Utils.CreditSettings;
-import com.rabbitmq.stream.perf.Utils.CreditsTypeConverter;
import com.rabbitmq.stream.perf.Utils.MetricsTagsTypeConverter;
import com.rabbitmq.stream.perf.Utils.NameStrategyConverter;
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
@@ -370,21 +368,6 @@ void commandLineMetricsTest() {
.isEqualTo("-x 1 -y 2");
}
- @ParameterizedTest
- @CsvSource({"10:1,10,1", "20:10,20,10", "20,20,1", "20-10,20,10"})
- void creditsConverterOk(String input, int expectedInitial, int expectedAdditional) {
- CreditSettings credits = new CreditsTypeConverter().convert(input);
- assertThat(credits.initial()).isEqualTo(expectedInitial);
- assertThat(credits.additional()).isEqualTo(expectedAdditional);
- }
-
- @ParameterizedTest
- @ValueSource(strings = {"foo", "-20:10", "20:-1"})
- void creditsConverterKo(String input) {
- assertThatThrownBy(() -> new CreditsTypeConverter().convert(input))
- .isInstanceOf(TypeConversionException.class);
- }
-
@Command(name = "test-command")
static class TestCommand {