Skip to content

Commit bbf26bc

Browse files
committed
Temporarily Restore Deprecations Affecting SIK
- Ease migration for spring-integration-kafka
1 parent dc47e72 commit bbf26bc

File tree

4 files changed

+230
-2
lines changed

4 files changed

+230
-2
lines changed

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

+27-1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.kafka.support.JavaUtils;
4747
import org.springframework.kafka.support.TopicPartitionOffset;
4848
import org.springframework.kafka.support.converter.MessageConverter;
49+
import org.springframework.retry.RecoveryCallback;
4950
import org.springframework.retry.support.RetryTemplate;
5051
import org.springframework.util.Assert;
5152

@@ -70,7 +71,6 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
7071

7172
private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null); // NOSONAR
7273

73-
@SuppressWarnings("deprecation")
7474
private org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler;
7575

7676
private CommonErrorHandler commonErrorHandler;
@@ -87,6 +87,10 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
8787

8888
private Boolean ackDiscarded;
8989

90+
private RetryTemplate retryTemplate;
91+
92+
private RecoveryCallback<? extends Object> recoveryCallback;
93+
9094
private Boolean statefulRetry;
9195

9296
private Boolean batchListener;
@@ -170,6 +174,25 @@ public void setAckDiscarded(Boolean ackDiscarded) {
170174
this.ackDiscarded = ackDiscarded;
171175
}
172176

177+
/**
178+
* Set a retryTemplate.
179+
* @param retryTemplate the template.
180+
* @deprecated since 2.8 - use a suitably configured error handler instead.
181+
*/
182+
@Deprecated
183+
public void setRetryTemplate(RetryTemplate retryTemplate) {
184+
this.retryTemplate = retryTemplate;
185+
}
186+
187+
/**
188+
* Set a callback to be used with the {@link #setRetryTemplate(RetryTemplate)
189+
* retryTemplate}.
190+
* @param recoveryCallback the callback.
191+
*/
192+
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
193+
this.recoveryCallback = recoveryCallback;
194+
}
195+
173196
/**
174197
* When using a {@link RetryTemplate} Set to true to enable stateful retry. Use in
175198
* conjunction with a
@@ -369,13 +392,16 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
369392
return instance;
370393
}
371394

395+
@SuppressWarnings("deprecation")
372396
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
373397
if (aklEndpoint.getRecordFilterStrategy() == null) {
374398
JavaUtils.INSTANCE
375399
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy);
376400
}
377401
JavaUtils.INSTANCE
378402
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
403+
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
404+
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
379405
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
380406
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
381407
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)

Diff for: spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
9494

9595
private boolean ackDiscarded;
9696

97+
private RetryTemplate retryTemplate;
98+
9799
private RecoveryCallback<? extends Object> recoveryCallback;
98100

99101
private boolean statefulRetry;
@@ -326,6 +328,34 @@ public void setAckDiscarded(boolean ackDiscarded) {
326328
this.ackDiscarded = ackDiscarded;
327329
}
328330

331+
@Nullable
332+
protected RetryTemplate getRetryTemplate() {
333+
return this.retryTemplate;
334+
}
335+
336+
/**
337+
* Set a retryTemplate.
338+
* @param retryTemplate the template.
339+
* @deprecated since 2.8 - use a suitably configured error handler instead.
340+
*/
341+
@Deprecated
342+
public void setRetryTemplate(RetryTemplate retryTemplate) {
343+
this.retryTemplate = retryTemplate;
344+
}
345+
346+
@Nullable
347+
protected RecoveryCallback<?> getRecoveryCallback() {
348+
return this.recoveryCallback;
349+
}
350+
351+
/**
352+
* Set a callback to be used with the {@link #setRetryTemplate(RetryTemplate)}.
353+
* @param recoveryCallback the callback.
354+
*/
355+
public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
356+
this.recoveryCallback = recoveryCallback;
357+
}
358+
329359
protected boolean isStatefulRetry() {
330360
return this.statefulRetry;
331361
}
@@ -498,7 +528,7 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer,
498528
protected abstract MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
499529
@Nullable MessageConverter messageConverter);
500530

501-
@SuppressWarnings("unchecked")
531+
@SuppressWarnings({ "unchecked", "deprecation" })
502532
private void setupMessageListener(MessageListenerContainer container,
503533
@Nullable MessageConverter messageConverter) {
504534

@@ -511,6 +541,14 @@ private void setupMessageListener(MessageListenerContainer container,
511541
boolean isBatchListener = isBatchListener();
512542
Assert.state(messageListener != null,
513543
() -> "Endpoint [" + this + "] must provide a non null message listener");
544+
Assert.state(this.retryTemplate == null || !isBatchListener,
545+
"A 'RetryTemplate' is not supported with a batch listener; consider configuring the container "
546+
+ "with a suitably configured 'SeekToCurrentBatchErrorHandler' instead");
547+
if (this.retryTemplate != null) {
548+
messageListener = new org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter<>(
549+
(MessageListener<K, V>) messageListener,
550+
this.retryTemplate, this.recoveryCallback, this.statefulRetry);
551+
}
514552
if (this.recordFilterStrategy != null) {
515553
if (isBatchListener) {
516554
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright 2016-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
22+
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
23+
import org.springframework.kafka.listener.MessageListener;
24+
import org.springframework.kafka.support.Acknowledgment;
25+
import org.springframework.lang.Nullable;
26+
import org.springframework.retry.RecoveryCallback;
27+
import org.springframework.retry.RetryState;
28+
import org.springframework.retry.support.DefaultRetryState;
29+
import org.springframework.retry.support.RetryTemplate;
30+
import org.springframework.util.Assert;
31+
32+
/**
33+
* A retrying message listener adapter for {@link MessageListener}s.
34+
*
35+
* @param <K> the key type.
36+
* @param <V> the value type.
37+
*
38+
* @author Gary Russell
39+
* @deprecated since 2.8 - use a suitably configured error handler instead.
40+
*
41+
*/
42+
@Deprecated
43+
public class RetryingMessageListenerAdapter<K, V>
44+
extends AbstractRetryingMessageListenerAdapter<K, V, MessageListener<K, V>>
45+
implements AcknowledgingConsumerAwareMessageListener<K, V> {
46+
47+
/**
48+
* {@link org.springframework.retry.RetryContext} attribute key for an acknowledgment
49+
* if the listener is capable of acknowledging.
50+
*/
51+
public static final String CONTEXT_ACKNOWLEDGMENT = "acknowledgment";
52+
53+
/**
54+
* {@link org.springframework.retry.RetryContext} attribute key for the consumer if
55+
* the listener is consumer-aware.
56+
*/
57+
public static final String CONTEXT_CONSUMER = "consumer";
58+
59+
/**
60+
* {@link org.springframework.retry.RetryContext} attribute key for the record.
61+
*/
62+
public static final String CONTEXT_RECORD = "record";
63+
64+
private boolean stateful;
65+
66+
/**
67+
* Construct an instance with the provided template and delegate. The exception will
68+
* be thrown to the container after retries are exhausted.
69+
* @param messageListener the delegate listener.
70+
* @param retryTemplate the template.
71+
*/
72+
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate) {
73+
this(messageListener, retryTemplate, null);
74+
}
75+
76+
/**
77+
* Construct an instance with the provided template, callback and delegate.
78+
* @param messageListener the delegate listener.
79+
* @param retryTemplate the template.
80+
* @param recoveryCallback the recovery callback; if null, the exception will be
81+
* thrown to the container after retries are exhausted.
82+
*/
83+
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate,
84+
@Nullable RecoveryCallback<? extends Object> recoveryCallback) {
85+
86+
this(messageListener, retryTemplate, recoveryCallback, false);
87+
}
88+
89+
/**
90+
* Construct an instance with the provided template, callback and delegate. When using
91+
* stateful retry, the retry context key is a concatenated String
92+
* {@code topic-partition-offset}. A
93+
* {@link org.springframework.kafka.listener.SeekToCurrentErrorHandler} is required in
94+
* the listener container because stateful retry will throw the exception to the
95+
* container for each delivery attempt.
96+
* @param messageListener the delegate listener.
97+
* @param retryTemplate the template.
98+
* @param recoveryCallback the recovery callback; if null, the exception will be
99+
* thrown to the container after retries are exhausted.
100+
* @param stateful true for stateful retry.
101+
* @since 2.1.3
102+
*/
103+
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate,
104+
@Nullable RecoveryCallback<? extends Object> recoveryCallback, boolean stateful) {
105+
106+
super(messageListener, retryTemplate, recoveryCallback);
107+
Assert.notNull(messageListener, "'messageListener' cannot be null");
108+
this.stateful = stateful;
109+
}
110+
111+
@Override
112+
public void onMessage(final ConsumerRecord<K, V> record, @Nullable final Acknowledgment acknowledgment,
113+
final Consumer<?, ?> consumer) {
114+
115+
RetryState retryState = null;
116+
if (this.stateful) {
117+
retryState = new DefaultRetryState(record.topic() + "-" + record.partition() + "-" + record.offset());
118+
}
119+
getRetryTemplate().execute(context -> {
120+
context.setAttribute(CONTEXT_RECORD, record);
121+
switch (RetryingMessageListenerAdapter.this.delegateType) {
122+
case ACKNOWLEDGING_CONSUMER_AWARE:
123+
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
124+
context.setAttribute(CONTEXT_CONSUMER, consumer);
125+
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
126+
break;
127+
case ACKNOWLEDGING:
128+
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
129+
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
130+
break;
131+
case CONSUMER_AWARE:
132+
context.setAttribute(CONTEXT_CONSUMER, consumer);
133+
RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
134+
break;
135+
case SIMPLE:
136+
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
137+
}
138+
return null;
139+
},
140+
getRecoveryCallback(), retryState);
141+
}
142+
143+
/*
144+
* Since the container uses the delegate's type to determine which method to call, we
145+
* must implement them all.
146+
*/
147+
148+
@Override
149+
public void onMessage(ConsumerRecord<K, V> data) {
150+
onMessage(data, null, null); // NOSONAR
151+
}
152+
153+
@Override
154+
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
155+
onMessage(data, acknowledgment, null); // NOSONAR
156+
}
157+
158+
@Override
159+
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
160+
onMessage(data, null, consumer);
161+
}
162+
163+
}

Diff for: spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/MicrometerHolderTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ void primary() {
9090
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(Config3.class);
9191
MicrometerHolder micrometerHolder = new MicrometerHolder(ctx, "holderName",
9292
"timerName", "timerDesc", Collections.emptyMap());
93+
@SuppressWarnings("unchecked")
9394
Map<String, Timer> meters = (Map<String, Timer>) ReflectionTestUtils.getField(micrometerHolder, "meters");
9495
assertThat(meters).hasSize(1);
9596
}

0 commit comments

Comments
 (0)