Skip to content

Commit 4d18ded

Browse files
committed
spring-projectsGH-3661: Resolve Spring Apache Kafka Deprecations
Resolves spring-projects#3661
1 parent 11c3acd commit 4d18ded

File tree

14 files changed

+664
-376
lines changed

14 files changed

+664
-376
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,8 +42,8 @@
4242
import reactor.core.publisher.Flux;
4343

4444
/**
45-
* A support class for producer endpoints that provides a setter for the
46-
* output channel and a convenience method for sending Messages.
45+
* A support class for producer endpoints that provides a setter for the output channel
46+
* and a convenience method for sending Messages.
4747
*
4848
* @author Mark Fisher
4949
* @author Artem Bilan
@@ -76,8 +76,8 @@ public void setOutputChannel(MessageChannel outputChannel) {
7676
}
7777

7878
/**
79-
* Set the output channel name; overrides
80-
* {@link #setOutputChannel(MessageChannel) outputChannel} if provided.
79+
* Set the output channel name; overrides {@link #setOutputChannel(MessageChannel)
80+
* outputChannel} if provided.
8181
* @param outputChannelName the channel name.
8282
* @since 4.3
8383
*/
@@ -114,8 +114,7 @@ public void setErrorChannelName(String errorChannelName) {
114114
}
115115

116116
/**
117-
* Return the error channel (if provided) to which error messages will
118-
* be routed.
117+
* Return the error channel (if provided) to which error messages will be routed.
119118
* @return the channel or null.
120119
* @since 4.3
121120
*/
@@ -130,8 +129,8 @@ public MessageChannel getErrorChannel() {
130129
}
131130

132131
/**
133-
* Configure the default timeout value to use for send operations.
134-
* May be overridden for individual messages.
132+
* Configure the default timeout value to use for send operations. May be overridden
133+
* for individual messages.
135134
* @param sendTimeout the send timeout in milliseconds
136135
* @see MessagingTemplate#setSendTimeout
137136
*/
@@ -145,8 +144,8 @@ public void setShouldTrack(boolean shouldTrack) {
145144
}
146145

147146
/**
148-
* Set an {@link ErrorMessageStrategy} to use to build an error message when a exception occurs.
149-
* Default is the {@link DefaultErrorMessageStrategy}.
147+
* Set an {@link ErrorMessageStrategy} to use to build an error message when a
148+
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
150149
* @param errorMessageStrategy the {@link ErrorMessageStrategy}.
151150
* @since 4.3.10
152151
*/
@@ -155,6 +154,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
155154
this.errorMessageStrategy = errorMessageStrategy;
156155
}
157156

157+
/**
158+
* Get an {@link ErrorMessageStrategy} to use to build an error message when a
159+
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
160+
* @return the errorMessageStrategy
161+
* @since 6.0
162+
*/
163+
protected ErrorMessageStrategy getErrorMessageStrategy() {
164+
return this.errorMessageStrategy;
165+
}
166+
158167
protected MessagingTemplate getMessagingTemplate() {
159168
return this.messagingTemplate;
160169
}
@@ -181,18 +190,16 @@ protected void onInit() {
181190
}
182191

183192
/**
184-
* Take no action by default.
185-
* Subclasses may override this if they
186-
* need lifecycle-managed behavior. Protected by 'lifecycleLock'.
193+
* Take no action by default. Subclasses may override this if they need
194+
* lifecycle-managed behavior. Protected by 'lifecycleLock'.
187195
*/
188196
@Override
189197
protected void doStart() {
190198
}
191199

192200
/**
193-
* Take no action by default.
194-
* Subclasses may override this if they
195-
* need lifecycle-managed behavior.
201+
* Take no action by default. Subclasses may override this if they need
202+
* lifecycle-managed behavior.
196203
*/
197204
@Override
198205
protected void doStop() {
@@ -217,12 +224,11 @@ protected void sendMessage(Message<?> messageArg) {
217224
protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
218225
MessageChannel channelForSubscription = getRequiredOutputChannel();
219226

220-
Flux<? extends Message<?>> messageFlux =
221-
Flux.from(publisher)
222-
.map(this::trackMessageIfAny)
223-
.doOnComplete(this::stop)
224-
.doOnCancel(this::stop)
225-
.takeWhile((message) -> isActive());
227+
Flux<? extends Message<?>> messageFlux = Flux.from(publisher)
228+
.map(this::trackMessageIfAny)
229+
.doOnComplete(this::stop)
230+
.doOnCancel(this::stop)
231+
.takeWhile((message) -> isActive());
226232

227233
if (channelForSubscription instanceof ReactiveStreamsSubscribableChannel) {
228234
((ReactiveStreamsSubscribableChannel) channelForSubscription).subscribeTo(messageFlux);
@@ -248,7 +254,8 @@ protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
248254
* @return true if the error channel is available and message sent.
249255
* @since 4.3.10
250256
*/
251-
protected final boolean sendErrorMessageIfNecessary(@Nullable Message<?> message, Exception exception) {
257+
protected final boolean sendErrorMessageIfNecessary(@Nullable
258+
Message<?> message, Exception exception) {
252259
MessageChannel channel = getErrorChannel();
253260
if (channel != null) {
254261
this.messagingTemplate.send(channel, buildErrorMessage(message, exception));
@@ -265,7 +272,8 @@ protected final boolean sendErrorMessageIfNecessary(@Nullable Message<?> message
265272
* @return the error message.
266273
* @since 4.3.10
267274
*/
268-
protected final ErrorMessage buildErrorMessage(@Nullable Message<?> message, Exception exception) {
275+
protected final ErrorMessage buildErrorMessage(@Nullable
276+
Message<?> message, Exception exception) {
269277
return this.errorMessageStrategy.buildErrorMessage(exception, getErrorMessageAttributes(message));
270278
}
271279

@@ -277,7 +285,8 @@ protected final ErrorMessage buildErrorMessage(@Nullable Message<?> message, Exc
277285
* @return the attributes.
278286
* @since 4.3.10
279287
*/
280-
protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
288+
protected AttributeAccessor getErrorMessageAttributes(@Nullable
289+
Message<?> message) {
281290
return ErrorMessageUtils.getAttributeAccessor(message, null);
282291
}
283292

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -299,8 +299,8 @@ public boolean isLoggingEnabled() {
299299
}
300300

301301
/**
302-
* Set an {@link ErrorMessageStrategy} to use to build an error message when a exception occurs.
303-
* Default is the {@link DefaultErrorMessageStrategy}.
302+
* Set an {@link ErrorMessageStrategy} to use to build an error message when a
303+
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
304304
* @param errorMessageStrategy the {@link ErrorMessageStrategy}.
305305
* @since 4.3.10
306306
*/
@@ -309,6 +309,16 @@ public final void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrat
309309
this.errorMessageStrategy = errorMessageStrategy;
310310
}
311311

312+
/**
313+
* Get an {@link ErrorMessageStrategy} to use to build an error message when a
314+
* exception occurs. Default is the {@link DefaultErrorMessageStrategy}.
315+
* @return the errorMessageStrategy.
316+
* @since 6.0
317+
*/
318+
protected ErrorMessageStrategy getErrorMessageStrategy() {
319+
return this.errorMessageStrategy;
320+
}
321+
312322
@Override
313323
public ManagementOverrides getOverrides() {
314324
return this.managementOverrides;

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/dsl/KafkaMessageListenerContainerSpec.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.core.task.AsyncListenableTaskExecutor;
2525
import org.springframework.integration.dsl.IntegrationComponentSpec;
2626
import org.springframework.kafka.core.ConsumerFactory;
27+
import org.springframework.kafka.listener.CommonErrorHandler;
2728
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
2829
import org.springframework.kafka.listener.ContainerProperties;
2930
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -81,17 +82,15 @@ public KafkaMessageListenerContainerSpec<K, V> concurrency(int concurrency) {
8182
}
8283

8384
/**
84-
* Specify an {@link org.springframework.kafka.listener.ErrorHandler} for the
85+
* Specify an {@link org.springframework.kafka.listener.CommonErrorHandler} for the
8586
* {@link org.springframework.kafka.listener.AbstractMessageListenerContainer}.
86-
* @param errorHandler the {@link org.springframework.kafka.listener.ErrorHandler}.
87+
* @param errorHandler the {@link org.springframework.kafka.listener.CommonErrorHandler}.
8788
* @return the spec.
88-
* @see org.springframework.kafka.listener.ErrorHandler
89+
* @since 6.0
90+
* @see org.springframework.kafka.listener.CommonErrorHandler
8991
*/
90-
@SuppressWarnings("deprecation")
91-
public KafkaMessageListenerContainerSpec<K, V> errorHandler(
92-
org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler) {
93-
94-
this.target.setGenericErrorHandler(errorHandler);
92+
public KafkaMessageListenerContainerSpec<K, V> errorHandler(CommonErrorHandler errorHandler) {
93+
this.target.setCommonErrorHandler(errorHandler);
9594
return this;
9695
}
9796

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.kafka.inbound;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
21+
import org.springframework.core.AttributeAccessor;
22+
import org.springframework.integration.core.ErrorMessagePublisher;
23+
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
24+
import org.springframework.integration.support.ErrorMessageStrategy;
25+
import org.springframework.integration.support.ErrorMessageUtils;
26+
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
27+
import org.springframework.kafka.support.KafkaHeaders;
28+
import org.springframework.messaging.MessageChannel;
29+
30+
/**
31+
* An extension of {@link ErrorMessagePublisher} that can be used in a
32+
* {@link org.springframework.kafka.listener.CommonErrorHandler} for recovering Kafka
33+
* delivery failures.
34+
*
35+
* @author Gary Russell
36+
* @since 6.0
37+
*
38+
*/
39+
public class KafkaErrorSendingMessageRecoverer extends ErrorMessagePublisher implements ConsumerRecordRecoverer {
40+
41+
/**
42+
* Construct an instance to send to the channel with the
43+
* {@link RawRecordHeaderErrorMessageStrategy}.
44+
* @param channel the channel.
45+
*/
46+
public KafkaErrorSendingMessageRecoverer(MessageChannel channel) {
47+
this(channel, new RawRecordHeaderErrorMessageStrategy());
48+
}
49+
50+
/**
51+
* Construct an instance to send the channel, using the error message strategy.
52+
* @param channel the channel.
53+
* @param errorMessageStrategy the strategy.
54+
*/
55+
public KafkaErrorSendingMessageRecoverer(MessageChannel channel, ErrorMessageStrategy errorMessageStrategy) {
56+
setChannel(channel);
57+
setErrorMessageStrategy(errorMessageStrategy);
58+
}
59+
60+
@Override
61+
public void accept(ConsumerRecord<?, ?> record, Exception ex) {
62+
Throwable thrown = ex.getCause();
63+
if (thrown == null) {
64+
thrown = ex;
65+
}
66+
AttributeAccessor attrs = ErrorMessageUtils.getAttributeAccessor(null, null);
67+
attrs.setAttribute(KafkaHeaders.RAW_DATA, record);
68+
publish(thrown, attrs);
69+
}
70+
71+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.kafka.inbound;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
21+
import org.springframework.kafka.KafkaException;
22+
import org.springframework.kafka.support.Acknowledgment;
23+
import org.springframework.retry.RecoveryCallback;
24+
import org.springframework.retry.support.RetryTemplate;
25+
26+
/**
27+
* @author Gary Russell
28+
* @since 6.0
29+
*
30+
*/
31+
public interface KafkaInboundEndpoint {
32+
33+
/**
34+
* {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment
35+
* if the listener is capable of acknowledging.
36+
*/
37+
String CONTEXT_ACKNOWLEDGMENT = "acknowledgment";
38+
39+
/**
40+
* {@link org.springframework.retry.RetryContext} attribute key for the consumer if
41+
* the listener is consumer-aware.
42+
*/
43+
String CONTEXT_CONSUMER = "consumer";
44+
45+
/**
46+
* {@link org.springframework.retry.RetryContext} attribute key for the record.
47+
*/
48+
String CONTEXT_RECORD = "record";
49+
50+
/**
51+
* Execute the runnable with the retry template and recovery callback.
52+
* @param template the template.
53+
* @param callback the callback.
54+
* @param record the record (or records).
55+
* @param acknowledgment the acknowledgment.
56+
* @param consumer the consumer.
57+
* @param runnable the runnable.
58+
*/
59+
default void doWithRetry(RetryTemplate template, RecoveryCallback<?> callback, Object data,
60+
Acknowledgment acknowledgment, Consumer<?, ?> consumer, Runnable runnable) {
61+
62+
try {
63+
template.execute(context -> {
64+
context.setAttribute(CONTEXT_RECORD, data);
65+
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
66+
context.setAttribute(CONTEXT_CONSUMER, consumer);
67+
runnable.run();
68+
return null;
69+
}, callback);
70+
}
71+
catch (Exception ex) {
72+
throw new KafkaException("Failed to execute runnable", ex);
73+
}
74+
}
75+
76+
}

0 commit comments

Comments
 (0)