Skip to content

Commit d72a4a3

Browse files
committed
spring-cloudGH-2685: Reactive Kafka Customizers - Add Generics
Resolves spring-cloud#2685 Required to allow customization of serializers/deserializers.
1 parent b814c0f commit d72a4a3

File tree

5 files changed

+31
-15
lines changed

5 files changed

+31
-15
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ public class ReactorKafkaBinder
9292

9393
private ProducerConfigCustomizer producerConfigCustomizer;
9494

95-
private ReceiverOptionsCustomizer receiverOptionsCustomizer = (name, opts) -> opts;
95+
private ReceiverOptionsCustomizer<Object, Object> receiverOptionsCustomizer = (name, opts) -> opts;
9696

97-
private SenderOptionsCustomizer senderOptionsCustomizer = (name, opts) -> opts;
97+
private SenderOptionsCustomizer<Object, Object> senderOptionsCustomizer = (name, opts) -> opts;
9898

9999
public ReactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
100100
KafkaTopicProvisioner provisioner) {
@@ -111,6 +111,7 @@ public void setProducerConfigCustomizer(ProducerConfigCustomizer producerConfigC
111111
this.producerConfigCustomizer = producerConfigCustomizer;
112112
}
113113

114+
@SuppressWarnings({ "rawtypes", "unchecked" })
114115
public void receiverOptionsCustomizers(ObjectProvider<ReceiverOptionsCustomizer> customizers) {
115116
if (customizers.getIfUnique() != null) {
116117
this.receiverOptionsCustomizer = customizers.getIfUnique();
@@ -120,7 +121,7 @@ public void receiverOptionsCustomizers(ObjectProvider<ReceiverOptionsCustomizer>
120121
ReceiverOptionsCustomizer customizer = (name, opts) -> {
121122
ReceiverOptions<Object, Object> last = null;
122123
for (ReceiverOptionsCustomizer cust: list) {
123-
last = cust.apply(name, opts);
124+
last = (ReceiverOptions<Object, Object>) cust.apply(name, opts);
124125
}
125126
return last;
126127
};
@@ -130,6 +131,7 @@ public void receiverOptionsCustomizers(ObjectProvider<ReceiverOptionsCustomizer>
130131
}
131132
}
132133

134+
@SuppressWarnings({ "rawtypes", "unchecked" })
133135
public void senderOptionsCustomizers(ObjectProvider<SenderOptionsCustomizer> customizers) {
134136
if (customizers.getIfUnique() != null) {
135137
this.senderOptionsCustomizer = customizers.getIfUnique();
@@ -139,7 +141,7 @@ public void senderOptionsCustomizers(ObjectProvider<SenderOptionsCustomizer> cus
139141
SenderOptionsCustomizer customizer = (name, opts) -> {
140142
SenderOptions<Object, Object> last = null;
141143
for (SenderOptionsCustomizer cust: list) {
142-
last = cust.apply(name, opts);
144+
last = (SenderOptions<Object, Object>) cust.apply(name, opts);
143145
}
144146
return last;
145147
};
@@ -161,7 +163,8 @@ protected MessageHandler createProducerMessageHandler(ProducerDestination destin
161163
destination.getName());
162164
}
163165

164-
SenderOptions<Object, Object> opts = this.senderOptionsCustomizer.apply(producerProperties.getBindingName(),
166+
SenderOptions<Object, Object> opts =
167+
this.senderOptionsCustomizer.apply(producerProperties.getBindingName(),
165168
SenderOptions.create(configs));
166169
// TODO bean for converter; MCB doesn't use one on the producer side.
167170
RecordMessageConverter converter = new MessagingMessageConverter();
@@ -188,7 +191,8 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
188191
ReceiverOptions<Object, Object> opts = ReceiverOptions.create(configs)
189192
.addAssignListener(parts -> logger.info("Assigned: " + parts))
190193
.subscription(Collections.singletonList(destination.getName()));
191-
opts = this.receiverOptionsCustomizer.apply(properties.getBindingName(), opts);
194+
opts = this.receiverOptionsCustomizer.apply(properties.getBindingName(),
195+
opts);
192196
ReceiverOptions<Object, Object> finalOpts = opts;
193197

194198
class ReactorMessageProducer extends MessageProducerSupport {

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReceiverOptionsCustomizer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@
2727
* the second is the {@link ReceiverOptions} to customize. Return the customized
2828
* {@link ReceiverOptions}. Applied in {@link Ordered order} if multiple customizers are
2929
* found.
30+
* <p>
31+
* Generic since 4.0.3 to allow customization of deserializers.
32+
*
33+
* @param <K> the key type.
34+
* @param <V> the value type.
3035
*
3136
* @author Gary Russell
3237
* @since 4.0.2
3338
*
3439
*/
35-
public interface ReceiverOptionsCustomizer
36-
extends BiFunction<String, ReceiverOptions<Object, Object>, ReceiverOptions<Object, Object>>, Ordered {
40+
public interface ReceiverOptionsCustomizer<K, V>
41+
extends BiFunction<String, ReceiverOptions<K, V>, ReceiverOptions<K, V>>, Ordered {
3742

3843
@Override
3944
default int getOrder() {

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/SenderOptionsCustomizer.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@
2727
* second is the {@link SenderOptions} to customize. Return the customized
2828
* {@link SenderOptions}. Applied in {@link Ordered order} if multiple customizers are
2929
* found.
30+
* <p>
31+
* Generic since 4.0.3 to allow customization of serializers.
32+
*
33+
* @param <K> the key type.
34+
* @param <V> the value type.
3035
*
3136
* @author Gary Russell
3237
* @since 4.0.2
3338
*
3439
*/
35-
public interface SenderOptionsCustomizer
36-
extends BiFunction<String, SenderOptions<Object, Object>, SenderOptions<Object, Object>>, Ordered {
40+
public interface SenderOptionsCustomizer<K, V>
41+
extends BiFunction<String, SenderOptions<K, V>, SenderOptions<K, V>>, Ordered {
3742

3843
@Override
3944
default int getOrder() {

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerConfig;
2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
2828
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.common.serialization.StringDeserializer;
2930
import org.junit.jupiter.api.extension.ExtendWith;
3031
import org.junit.jupiter.params.ParameterizedTest;
3132
import org.junit.jupiter.params.provider.ValueSource;
@@ -167,20 +168,21 @@ public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase()
167168
}
168169

169170
@Bean
170-
ReceiverOptionsCustomizer cust1() {
171+
ReceiverOptionsCustomizer<?, ?> cust1() {
171172
return (t, u) -> {
172173
recOptsCustOrder.add("one");
173174
return u;
174175
};
175176
}
176177

177178
@Bean
178-
ReceiverOptionsCustomizer cust2() {
179-
return new ReceiverOptionsCustomizer() {
179+
ReceiverOptionsCustomizer<String, byte[]> cust2() {
180+
return new ReceiverOptionsCustomizer<>() {
180181

181182
@Override
182-
public ReceiverOptions<Object, Object> apply(String t, ReceiverOptions<Object, Object> u) {
183+
public ReceiverOptions<String, byte[]> apply(String t, ReceiverOptions<String, byte[]> u) {
183184
recOptsCustOrder.add("two");
185+
u.withKeyDeserializer(new StringDeserializer());
184186
return u;
185187
}
186188

@@ -189,7 +191,6 @@ public int getOrder() {
189191
return -1;
190192
}
191193

192-
193194
};
194195
}
195196

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ void producerBinding() throws InterruptedException {
220220
provisioner.setMetadataRetryOperations(new RetryTemplate());
221221
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
222222
binder.setApplicationContext(new GenericApplicationContext());
223+
@SuppressWarnings("rawtypes")
223224
ObjectProvider<SenderOptionsCustomizer> cust = mock(ObjectProvider.class);
224225
AtomicBoolean custCalled = new AtomicBoolean();
225226
given(cust.getIfUnique()).willReturn((name, opts) -> {

0 commit comments

Comments
 (0)