Skip to content

Commit e17a097

Browse files
cenkakinartembilan
authored andcommitted
GH-2554: Fix DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME value
Fixes #2554
1 parent 1dcf11d commit e17a097

File tree

2 files changed

+256
-2
lines changed

2 files changed

+256
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,6 +62,6 @@ private RetryTopicBeanNames() {
6262
* The bean name of the internally registered scheduler wrapper, if needed.
6363
*/
6464
public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME =
65-
"defaultRetryTopicKafkaTemplate";
65+
"defaultRetryTopicSchedulerWrapper";
6666

6767
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Copyright 2021-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.fail;
21+
import static org.mockito.Mockito.spy;
22+
import static org.mockito.Mockito.verify;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
29+
import org.apache.kafka.clients.consumer.ConsumerConfig;
30+
import org.apache.kafka.clients.producer.ProducerConfig;
31+
import org.apache.kafka.common.serialization.StringDeserializer;
32+
import org.apache.kafka.common.serialization.StringSerializer;
33+
import org.junit.jupiter.api.Test;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import org.springframework.beans.factory.annotation.Autowired;
38+
import org.springframework.context.annotation.Bean;
39+
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.kafka.annotation.DltHandler;
41+
import org.springframework.kafka.annotation.EnableKafka;
42+
import org.springframework.kafka.annotation.KafkaListener;
43+
import org.springframework.kafka.annotation.RetryableTopic;
44+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
45+
import org.springframework.kafka.core.ConsumerFactory;
46+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
47+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
48+
import org.springframework.kafka.core.KafkaTemplate;
49+
import org.springframework.kafka.core.ProducerFactory;
50+
import org.springframework.kafka.listener.ContainerProperties;
51+
import org.springframework.kafka.listener.DefaultErrorHandler;
52+
import org.springframework.kafka.support.KafkaHeaders;
53+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
54+
import org.springframework.kafka.test.context.EmbeddedKafka;
55+
import org.springframework.messaging.handler.annotation.Header;
56+
import org.springframework.retry.annotation.Backoff;
57+
import org.springframework.stereotype.Component;
58+
import org.springframework.test.annotation.DirtiesContext;
59+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
60+
import org.springframework.util.backoff.FixedBackOff;
61+
62+
/**
63+
* @author Cenk Akin
64+
* @since 3.0.3
65+
*/
66+
@SpringJUnitConfig
67+
@DirtiesContext
68+
@EmbeddedKafka(topics = {RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC,
69+
RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC}, partitions = 1)
70+
public class RetryTopicMultipleListenerIntegrationTests {
71+
72+
private static final Logger logger = LoggerFactory.getLogger(RetryTopicMultipleListenerIntegrationTests.class);
73+
74+
public final static String FIRST_TOPIC = "myRetryTopic1";
75+
76+
public final static String SECOND_TOPIC = "myRetryTopic2";
77+
78+
@Autowired
79+
private KafkaTemplate<String, String> sendKafkaTemplate;
80+
81+
@Autowired
82+
private CountDownLatchContainer latchContainer;
83+
84+
@Test
85+
void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory componentFactory) {
86+
logger.debug("Sending message to topic " + FIRST_TOPIC);
87+
sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1");
88+
logger.debug("Sending message to topic " + SECOND_TOPIC);
89+
sendKafkaTemplate.send(SECOND_TOPIC, "Testing topic 2");
90+
assertThat(awaitLatch(latchContainer.firstCountDownLatch)).isTrue();
91+
assertThat(awaitLatch(latchContainer.firstCountDownLatchDlt)).isTrue();
92+
assertThat(awaitLatch(latchContainer.secondCountDownLatch)).isTrue();
93+
assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue();
94+
verify(componentFactory).destinationTopicResolver();
95+
}
96+
97+
private boolean awaitLatch(CountDownLatch latch) {
98+
try {
99+
return latch.await(150, TimeUnit.SECONDS);
100+
}
101+
catch (Exception e) {
102+
fail(e.getMessage());
103+
throw new RuntimeException(e);
104+
}
105+
}
106+
107+
@Component
108+
static class FirstKafkaListener {
109+
110+
@Autowired
111+
CountDownLatchContainer countDownLatchContainer;
112+
113+
@RetryableTopic(
114+
attempts = "4",
115+
backoff = @Backoff(delay = 10, multiplier = 2.0),
116+
autoCreateTopics = "false",
117+
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
118+
@KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC)
119+
public void firstListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
120+
countDownLatchContainer.firstCountDownLatch.countDown();
121+
logger.warn(in + " from " + topic);
122+
throw new RuntimeException("test");
123+
}
124+
125+
@DltHandler
126+
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
127+
countDownLatchContainer.firstCountDownLatchDlt.countDown();
128+
logger.warn(in + " from " + topic);
129+
}
130+
}
131+
132+
@Component
133+
static class SecondKafkaListener {
134+
135+
@Autowired
136+
CountDownLatchContainer countDownLatchContainer;
137+
138+
@RetryableTopic
139+
@KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC)
140+
public void secondListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
141+
countDownLatchContainer.secondCountDownLatch.countDown();
142+
logger.info(in + " from " + topic);
143+
throw new RuntimeException("another test");
144+
}
145+
146+
@DltHandler
147+
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
148+
countDownLatchContainer.secondCountDownLatchDlt.countDown();
149+
logger.warn(in + " from " + topic);
150+
}
151+
}
152+
153+
@Component
154+
static class CountDownLatchContainer {
155+
156+
CountDownLatch firstCountDownLatch = new CountDownLatch(4);
157+
CountDownLatch secondCountDownLatch = new CountDownLatch(3);
158+
CountDownLatch firstCountDownLatchDlt = new CountDownLatch(1);
159+
160+
CountDownLatch secondCountDownLatchDlt = new CountDownLatch(1);
161+
CountDownLatch customizerLatch = new CountDownLatch(6);
162+
}
163+
164+
@EnableKafka
165+
@Configuration
166+
static class Config {
167+
168+
@Autowired
169+
EmbeddedKafkaBroker broker;
170+
171+
@Bean
172+
CountDownLatchContainer latchContainer() {
173+
return new CountDownLatchContainer();
174+
}
175+
176+
@Bean
177+
FirstKafkaListener firstKafkaListener() {
178+
return new FirstKafkaListener();
179+
}
180+
181+
@Bean
182+
SecondKafkaListener secondKafkaListener() {
183+
return new SecondKafkaListener();
184+
}
185+
186+
@Bean
187+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
188+
ConsumerFactory<String, String> consumerFactory, CountDownLatchContainer latchContainer) {
189+
190+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
191+
factory.setConsumerFactory(consumerFactory);
192+
ContainerProperties props = factory.getContainerProperties();
193+
props.setIdleEventInterval(100L);
194+
props.setPollTimeout(50L);
195+
props.setIdlePartitionEventInterval(100L);
196+
factory.setConsumerFactory(consumerFactory);
197+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
198+
(cr, ex) -> latchContainer.secondCountDownLatch.countDown(),
199+
new FixedBackOff(0, 2));
200+
factory.setCommonErrorHandler(errorHandler);
201+
factory.setConcurrency(1);
202+
factory.setContainerCustomizer(
203+
container -> latchContainer.customizerLatch.countDown());
204+
return factory;
205+
}
206+
207+
@Bean
208+
public ProducerFactory<String, String> producerFactory() {
209+
Map<String, Object> configProps = new HashMap<>();
210+
configProps.put(
211+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
212+
this.broker.getBrokersAsString());
213+
configProps.put(
214+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
215+
StringSerializer.class);
216+
configProps.put(
217+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
218+
StringSerializer.class);
219+
return new DefaultKafkaProducerFactory<>(configProps);
220+
}
221+
222+
@Bean
223+
public KafkaTemplate<String, String> kafkaTemplate() {
224+
return new KafkaTemplate<>(producerFactory());
225+
}
226+
227+
@Bean
228+
public ConsumerFactory<String, String> consumerFactory() {
229+
Map<String, Object> props = new HashMap<>();
230+
props.put(
231+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
232+
this.broker.getBrokersAsString());
233+
props.put(
234+
ConsumerConfig.GROUP_ID_CONFIG,
235+
"groupId");
236+
props.put(
237+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
238+
StringDeserializer.class);
239+
props.put(
240+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
241+
StringDeserializer.class);
242+
props.put(
243+
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
244+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
245+
246+
return new DefaultKafkaConsumerFactory<>(props);
247+
}
248+
249+
@Bean
250+
RetryTopicComponentFactory componentFactory() {
251+
return spy(new RetryTopicComponentFactory());
252+
}
253+
}
254+
}

0 commit comments

Comments
 (0)