Skip to content

Make subscription initial credits configurable #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/main/java/com/rabbitmq/stream/ConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ public interface ConsumerBuilder {
*/
ConsumerBuilder noTrackingStrategy();

/**
* Configure flow of messages.
*
* @return the flow configuration
*/
FlowConfiguration flow();

/**
* Create the configured {@link Consumer}
*
Expand Down Expand Up @@ -209,4 +216,25 @@ interface AutoTrackingStrategy {
*/
ConsumerBuilder builder();
}

/** Message flow configuration. */
interface FlowConfiguration {

/**
* The number of initial credits for the subscription.
*
* <p>Default is 1.
*
* @param initialCredits the number of initial credits
* @return this configuration instance
*/
FlowConfiguration initialCredits(int initialCredits);

/**
* Go back to the builder.
*
* @return the consumer builder
*/
ConsumerBuilder builder();
}
}
8 changes: 4 additions & 4 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -1113,17 +1113,17 @@ public Response subscribe(
* @param subscriptionId identifier to correlate inbound messages to this subscription
* @param stream the stream to consume from
* @param offsetSpecification the specification of the offset to consume from
* @param credit the initial number of credits
* @param initialCredits the initial number of credits
* @param properties some optional properties to describe the subscription
* @return the subscription confirmation
*/
public Response subscribe(
byte subscriptionId,
String stream,
OffsetSpecification offsetSpecification,
int credit,
int initialCredits,
Map<String, String> properties) {
if (credit < 0 || credit > Short.MAX_VALUE) {
if (initialCredits < 0 || initialCredits > Short.MAX_VALUE) {
throw new IllegalArgumentException("Credit value must be between 0 and " + Short.MAX_VALUE);
}
int length = 2 + 2 + 4 + 1 + 2 + stream.length() + 2 + 2; // misses the offset
Expand Down Expand Up @@ -1152,7 +1152,7 @@ public Response subscribe(
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
bb.writeLong(offsetSpecification.getOffset());
}
bb.writeShort(credit);
bb.writeShort(initialCredits);
if (properties != null && !properties.isEmpty()) {
bb.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Expand Down
45 changes: 33 additions & 12 deletions src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ class StreamConsumerBuilder implements ConsumerBuilder {
private boolean noTrackingStrategy = false;
private boolean lazyInit = false;
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
private Map<String, String> subscriptionProperties = new ConcurrentHashMap<>();
private final Map<String, String> subscriptionProperties = new ConcurrentHashMap<>();
private ConsumerUpdateListener consumerUpdateListener;
private int initialCredits = 1;
private int additionalCredits = 1;
private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this);

public StreamConsumerBuilder(StreamEnvironment environment) {
this.environment = environment;
Expand Down Expand Up @@ -132,13 +131,9 @@ public ConsumerBuilder noTrackingStrategy() {
return this;
}

public ConsumerBuilder credits(int initial, int onChunkDelivery) {
if (initial <= 0 || onChunkDelivery <= 0) {
throw new IllegalArgumentException("Credits must be positive");
}
this.initialCredits = initial;
this.additionalCredits = onChunkDelivery;
return this;
@Override
public FlowConfiguration flow() {
return this.flowConfiguration;
}

StreamConsumerBuilder lazyInit(boolean lazyInit) {
Expand Down Expand Up @@ -204,8 +199,8 @@ public Consumer build() {
this.subscriptionListener,
this.subscriptionProperties,
this.consumerUpdateListener,
this.initialCredits,
this.additionalCredits);
this.flowConfiguration.initialCredits,
this.flowConfiguration.additionalCredits);
environment.addConsumer((StreamConsumer) consumer);
} else {
if (Utils.isSac(this.subscriptionProperties)) {
Expand Down Expand Up @@ -342,6 +337,32 @@ StreamConsumerBuilder duplicate() {
return duplicate;
}

private static class DefaultFlowConfiguration implements FlowConfiguration {

private final ConsumerBuilder consumerBuilder;

private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
this.consumerBuilder = consumerBuilder;
}

private int initialCredits = 1;
private final int additionalCredits = 1;

@Override
public FlowConfiguration initialCredits(int initialCredits) {
if (initialCredits <= 0) {
throw new IllegalArgumentException("Credits must be positive");
}
this.initialCredits = initialCredits;
return this;
}

@Override
public ConsumerBuilder builder() {
return this.consumerBuilder;
}
}

// to help testing
public ConsumerUpdateListener consumerUpdateListener() {
return consumerUpdateListener;
Expand Down
22 changes: 13 additions & 9 deletions src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.metrics.MetricsCollector;
import com.rabbitmq.stream.perf.ShutdownService.CloseCallback;
import com.rabbitmq.stream.perf.Utils.CreditSettings;
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector;
import io.micrometer.core.instrument.Counter;
Expand Down Expand Up @@ -415,13 +414,6 @@ public class StreamPerfTest implements Callable<Integer> {
defaultValue = "false")
private boolean metricsCommandLineArguments;

@CommandLine.Option(
names = {"--credits", "-cr"},
description = "initial and additional credits for subscriptions",
defaultValue = "1:1",
converter = Utils.CreditsTypeConverter.class)
private CreditSettings credits;

@CommandLine.Option(
names = {"--requested-max-frame-size", "-rmfs"},
description = "maximum frame size to request",
Expand Down Expand Up @@ -472,6 +464,13 @@ static class InstanceSyncOptions {
private String instanceSyncNamespace;
}

@CommandLine.Option(
names = {"--initial-credits", "-ic"},
description = "initial credits for subscription",
defaultValue = "1",
converter = Utils.NotNegativeIntegerTypeConverter.class)
private int initialCredits;

private MetricsCollector metricsCollector;
private PerformanceMetrics performanceMetrics;
private List<Monitoring> monitorings;
Expand Down Expand Up @@ -994,7 +993,12 @@ public Integer call() throws Exception {
AtomicLong messageCount = new AtomicLong(0);
String stream = stream(streams, i);
ConsumerBuilder consumerBuilder =
environment.consumerBuilder().offset(this.offset);
environment
.consumerBuilder()
.offset(this.offset)
.flow()
.initialCredits(this.initialCredits)
.builder();

if (this.superStreams) {
consumerBuilder.superStream(stream);
Expand Down
59 changes: 0 additions & 59 deletions src/main/java/com/rabbitmq/stream/perf/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -649,65 +649,6 @@ public Integer convert(String input) {
}
}

static class CreditsTypeConverter implements CommandLine.ITypeConverter<CreditSettings> {

@Override
public CreditSettings convert(String input) {
String errorMessage =
input + " is not a valid credits setting, " + "valid example values: 20:1, 15";
if (input == null || input.trim().isEmpty()) {
typeConversionException(errorMessage);
}
if (input.contains(":") || input.contains("-")) {
String separator = input.contains(":") ? ":" : "-";
String[] split = input.split(separator);
if (split.length != 2) {
typeConversionException(errorMessage);
} else {
int[] credits =
Arrays.stream(split)
.mapToInt(Integer::valueOf)
.peek(
c -> {
if (c <= 0) {
typeConversionException("credit values must be positive");
}
})
.toArray();
return new CreditSettings(credits[0], credits[1]);
}
}
try {
int value = Integer.parseInt(input);
if (value <= 0) {
typeConversionException("credit values must be positive");
}
return new CreditSettings(value, 1);
} catch (Exception e) {
typeConversionException(errorMessage);
}
return new CreditSettings(10, 1);
}
}

static class CreditSettings {

private final int initial, additional;

CreditSettings(int initial, int additional) {
this.initial = initial;
this.additional = additional;
}

int initial() {
return this.initial;
}

int additional() {
return this.additional;
}
}

private static void typeConversionException(String message) {
throw new TypeConversionException(message);
}
Expand Down
17 changes: 0 additions & 17 deletions src/test/java/com/rabbitmq/stream/perf/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
import com.rabbitmq.stream.perf.Utils.CreditSettings;
import com.rabbitmq.stream.perf.Utils.CreditsTypeConverter;
import com.rabbitmq.stream.perf.Utils.MetricsTagsTypeConverter;
import com.rabbitmq.stream.perf.Utils.NameStrategyConverter;
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
Expand Down Expand Up @@ -370,21 +368,6 @@ void commandLineMetricsTest() {
.isEqualTo("-x 1 -y 2");
}

@ParameterizedTest
@CsvSource({"10:1,10,1", "20:10,20,10", "20,20,1", "20-10,20,10"})
void creditsConverterOk(String input, int expectedInitial, int expectedAdditional) {
CreditSettings credits = new CreditsTypeConverter().convert(input);
assertThat(credits.initial()).isEqualTo(expectedInitial);
assertThat(credits.additional()).isEqualTo(expectedAdditional);
}

@ParameterizedTest
@ValueSource(strings = {"foo", "-20:10", "20:-1"})
void creditsConverterKo(String input) {
assertThatThrownBy(() -> new CreditsTypeConverter().convert(input))
.isInstanceOf(TypeConversionException.class);
}

@Command(name = "test-command")
static class TestCommand {

Expand Down