Skip to content

Commit 8b8a7a4

Browse files
authored
Fix DSL for inner bean names generation (#8639)
The `IntegrationFlowBeanPostProcessor.processIntegrationComponentSpec()` uses a wrong `generateBeanName()` for component to register making the provided `id` as a prefix * Use `generateBeanName(Object instance, String prefix, @nullable String fallbackId, boolean useFlowIdAsPrefix)` instead to properly "fallback" to the provided name **Cherry-pick to `6.1.x`**
1 parent 79408d7 commit 8b8a7a4

File tree

2 files changed

+18
-12
lines changed

2 files changed

+18
-12
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/context/IntegrationFlowBeanPostProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ private void processIntegrationComponentSpec(String beanName, IntegrationCompone
356356
.filter(component -> noBeanPresentForComponent(component.getKey(), beanName))
357357
.forEach(component ->
358358
registerComponent(component.getKey(),
359-
generateBeanName(component.getKey(), component.getValue())));
359+
generateBeanName(component.getKey(), beanName, component.getValue(), false)));
360360

361361
}
362362
}

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/dsl/KafkaDslTests.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,10 @@ void testGateways() throws Exception {
244244
}
245245

246246
@Test
247-
void channels(@Autowired MessageChannel topic6Channel, @Autowired PollableKafkaChannel topic8Channel,
248-
@Autowired PollableKafkaChannel topic9Channel) {
247+
void channels(@Qualifier("topic6Channel") MessageChannel topic6Channel,
248+
@Qualifier("topic8Channel") PollableKafkaChannel topic8Channel,
249+
@Qualifier("topic9Channel") PollableKafkaChannel topic9Channel) {
250+
249251
topic6Channel.send(new GenericMessage<>("foo"));
250252
Message<?> received = topic8Channel.receive();
251253
assertThat(received)
@@ -344,7 +346,8 @@ public PollableChannel futuresChannel() {
344346
}
345347

346348
@Bean
347-
public IntegrationFlow sendToKafkaFlow() {
349+
public IntegrationFlow sendToKafkaFlow(
350+
KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandlerTopic2) {
348351
return f -> f
349352
.<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
350353
.enrichHeaders(h -> h.header(KafkaIntegrationHeaders.FUTURE_TOKEN, "foo"))
@@ -353,17 +356,15 @@ public IntegrationFlow sendToKafkaFlow() {
353356
kafkaMessageHandler(producerFactory(), TEST_TOPIC1)
354357
.timestampExpression("T(Long).valueOf('1487694048633')"),
355358
e -> e.id("kafkaProducer1")))
356-
.subscribe(sf -> sf.handle(
357-
kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
358-
.flush(msg -> true)
359-
.timestamp(m -> 1487694048644L),
360-
e -> e.id("kafkaProducer2")))
359+
.subscribe(sf -> sf.handle(kafkaMessageHandlerTopic2, e -> e.id("kafkaProducer2")))
361360
);
362361
}
363362

364363
@Bean
365-
public DefaultKafkaHeaderMapper mapper() {
366-
return new DefaultKafkaHeaderMapper();
364+
public KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandlerTopic2() {
365+
return kafkaMessageHandler(producerFactory(), TEST_TOPIC2)
366+
.flush(msg -> true)
367+
.timestamp(m -> 1487694048644L);
367368
}
368369

369370
private KafkaProducerMessageHandlerSpec<Integer, String, ?> kafkaMessageHandler(
@@ -382,6 +383,11 @@ public DefaultKafkaHeaderMapper mapper() {
382383
.configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic));
383384
}
384385

386+
@Bean
387+
public DefaultKafkaHeaderMapper mapper() {
388+
return new DefaultKafkaHeaderMapper();
389+
}
390+
385391

386392
@Bean
387393
public IntegrationFlow sourceFlow() {
@@ -427,7 +433,7 @@ public KafkaMessageSource<Integer, String> channelSource(ConsumerFactory<Integer
427433
@Bean
428434
public IntegrationFlow channels(KafkaTemplate<Integer, String> template,
429435
ConcurrentKafkaListenerContainerFactory<Integer, String> containerFactory,
430-
KafkaMessageSource<?, ?> channelSource,
436+
@Qualifier("channelSource") KafkaMessageSource<?, ?> channelSource,
431437
PublishSubscribeKafkaChannel publishSubscribeKafkaChannel) {
432438

433439
return IntegrationFlow.from(topic6Channel(template, containerFactory))

0 commit comments

Comments
 (0)