Skip to content

Commit 64835ef

Browse files
garyrussellsobychacko
authored andcommitted
spring-cloudGH-2685: Reactive Kafka Customizers - Add Generics
Resolves spring-cloud#2685 Required to allow customization of serializers/deserializers.
1 parent b814c0f commit 64835ef

File tree

5 files changed

+27
-13
lines changed

5 files changed

+27
-13
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinder.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,9 @@ public class ReactorKafkaBinder
9292

9393
private ProducerConfigCustomizer producerConfigCustomizer;
9494

95-
private ReceiverOptionsCustomizer receiverOptionsCustomizer = (name, opts) -> opts;
95+
private ReceiverOptionsCustomizer<Object, Object> receiverOptionsCustomizer = (name, opts) -> opts;
9696

97-
private SenderOptionsCustomizer senderOptionsCustomizer = (name, opts) -> opts;
97+
private SenderOptionsCustomizer<Object, Object> senderOptionsCustomizer = (name, opts) -> opts;
9898

9999
public ReactorKafkaBinder(KafkaBinderConfigurationProperties configurationProperties,
100100
KafkaTopicProvisioner provisioner) {
@@ -111,6 +111,7 @@ public void setProducerConfigCustomizer(ProducerConfigCustomizer producerConfigC
111111
this.producerConfigCustomizer = producerConfigCustomizer;
112112
}
113113

114+
@SuppressWarnings({ "rawtypes", "unchecked" })
114115
public void receiverOptionsCustomizers(ObjectProvider<ReceiverOptionsCustomizer> customizers) {
115116
if (customizers.getIfUnique() != null) {
116117
this.receiverOptionsCustomizer = customizers.getIfUnique();
@@ -120,7 +121,7 @@ public void receiverOptionsCustomizers(ObjectProvider<ReceiverOptionsCustomizer>
120121
ReceiverOptionsCustomizer customizer = (name, opts) -> {
121122
ReceiverOptions<Object, Object> last = null;
122123
for (ReceiverOptionsCustomizer cust: list) {
123-
last = cust.apply(name, opts);
124+
last = (ReceiverOptions<Object, Object>) cust.apply(name, opts);
124125
}
125126
return last;
126127
};
@@ -130,6 +131,7 @@ public void receiverOptionsCustomizers(ObjectProvider<ReceiverOptionsCustomizer>
130131
}
131132
}
132133

134+
@SuppressWarnings({ "rawtypes", "unchecked" })
133135
public void senderOptionsCustomizers(ObjectProvider<SenderOptionsCustomizer> customizers) {
134136
if (customizers.getIfUnique() != null) {
135137
this.senderOptionsCustomizer = customizers.getIfUnique();
@@ -139,7 +141,7 @@ public void senderOptionsCustomizers(ObjectProvider<SenderOptionsCustomizer> cus
139141
SenderOptionsCustomizer customizer = (name, opts) -> {
140142
SenderOptions<Object, Object> last = null;
141143
for (SenderOptionsCustomizer cust: list) {
142-
last = cust.apply(name, opts);
144+
last = (SenderOptions<Object, Object>) cust.apply(name, opts);
143145
}
144146
return last;
145147
};

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReceiverOptionsCustomizer.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@
2727
* the second is the {@link ReceiverOptions} to customize. Return the customized
2828
* {@link ReceiverOptions}. Applied in {@link Ordered order} if multiple customizers are
2929
* found.
30+
* <p>
31+
* Generic since 4.0.3 to allow customization of deserializers.
32+
*
33+
* @param <K> the key type.
34+
* @param <V> the value type.
3035
*
3136
* @author Gary Russell
3237
* @since 4.0.2
3338
*
3439
*/
35-
public interface ReceiverOptionsCustomizer
36-
extends BiFunction<String, ReceiverOptions<Object, Object>, ReceiverOptions<Object, Object>>, Ordered {
40+
public interface ReceiverOptionsCustomizer<K, V>
41+
extends BiFunction<String, ReceiverOptions<K, V>, ReceiverOptions<K, V>>, Ordered {
3742

3843
@Override
3944
default int getOrder() {

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/SenderOptionsCustomizer.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,18 @@
2727
* second is the {@link SenderOptions} to customize. Return the customized
2828
* {@link SenderOptions}. Applied in {@link Ordered order} if multiple customizers are
2929
* found.
30+
* <p>
31+
* Generic since 4.0.3 to allow customization of serializers.
32+
*
33+
* @param <K> the key type.
34+
* @param <V> the value type.
3035
*
3136
* @author Gary Russell
3237
* @since 4.0.2
3338
*
3439
*/
35-
public interface SenderOptionsCustomizer
36-
extends BiFunction<String, SenderOptions<Object, Object>, SenderOptions<Object, Object>>, Ordered {
40+
public interface SenderOptionsCustomizer<K, V>
41+
extends BiFunction<String, SenderOptions<K, V>, SenderOptions<K, V>>, Ordered {
3742

3843
@Override
3944
default int getOrder() {

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.ConsumerConfig;
2727
import org.apache.kafka.clients.consumer.ConsumerRecord;
2828
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.common.serialization.StringDeserializer;
2930
import org.junit.jupiter.api.extension.ExtendWith;
3031
import org.junit.jupiter.params.ParameterizedTest;
3132
import org.junit.jupiter.params.provider.ValueSource;
@@ -167,20 +168,21 @@ public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase()
167168
}
168169

169170
@Bean
170-
ReceiverOptionsCustomizer cust1() {
171+
ReceiverOptionsCustomizer<?, ?> cust1() {
171172
return (t, u) -> {
172173
recOptsCustOrder.add("one");
173174
return u;
174175
};
175176
}
176177

177178
@Bean
178-
ReceiverOptionsCustomizer cust2() {
179-
return new ReceiverOptionsCustomizer() {
179+
ReceiverOptionsCustomizer<String, byte[]> cust2() {
180+
return new ReceiverOptionsCustomizer<>() {
180181

181182
@Override
182-
public ReceiverOptions<Object, Object> apply(String t, ReceiverOptions<Object, Object> u) {
183+
public ReceiverOptions<String, byte[]> apply(String t, ReceiverOptions<String, byte[]> u) {
183184
recOptsCustOrder.add("two");
185+
u.withKeyDeserializer(new StringDeserializer());
184186
return u;
185187
}
186188

@@ -189,7 +191,6 @@ public int getOrder() {
189191
return -1;
190192
}
191193

192-
193194
};
194195
}
195196

binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ void producerBinding() throws InterruptedException {
220220
provisioner.setMetadataRetryOperations(new RetryTemplate());
221221
ReactorKafkaBinder binder = new ReactorKafkaBinder(binderProps, provisioner);
222222
binder.setApplicationContext(new GenericApplicationContext());
223+
@SuppressWarnings("rawtypes")
223224
ObjectProvider<SenderOptionsCustomizer> cust = mock(ObjectProvider.class);
224225
AtomicBoolean custCalled = new AtomicBoolean();
225226
given(cust.getIfUnique()).willReturn((name, opts) -> {

0 commit comments

Comments
 (0)