184
184
"annotated25" , "annotated25reply1" , "annotated25reply2" , "annotated26" , "annotated27" , "annotated28" ,
185
185
"annotated29" , "annotated30" , "annotated30reply" , "annotated31" , "annotated32" , "annotated33" ,
186
186
"annotated34" , "annotated35" , "annotated36" , "annotated37" , "foo" , "manualStart" , "seekOnIdle" ,
187
- "annotated38" , "annotated38reply" , "annotated39" , "annotated40" , "annotated41" , "annotated42" })
187
+ "annotated38" , "annotated38reply" , "annotated39" , "annotated40" , "annotated41" , "annotated42" ,
188
+ "annotated43" , "annotated43reply" })
188
189
@ TestPropertySource (properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10" )
189
190
public class EnableKafkaIntegrationTests {
190
191
@@ -438,6 +439,15 @@ public void testInterface() throws Exception {
438
439
template .send ("annotated7" , 0 , "foo" );
439
440
template .flush ();
440
441
assertThat (this .ifaceListener .getLatch1 ().await (60 , TimeUnit .SECONDS )).isTrue ();
442
+ Map <String , Object > consumerProps = new HashMap <>(this .consumerFactory .getConfigurationProperties ());
443
+ consumerProps .put (ConsumerConfig .GROUP_ID_CONFIG , "testInterface" );
444
+ ConsumerFactory <Integer , String > cf = new DefaultKafkaConsumerFactory <>(consumerProps );
445
+ Consumer <Integer , String > consumer = cf .createConsumer ();
446
+ this .embeddedKafka .consumeFromAnEmbeddedTopic (consumer , "annotated43reply" );
447
+ template .send ("annotated43" , 0 , "foo" );
448
+ ConsumerRecord <Integer , String > reply = KafkaTestUtils .getSingleRecord (consumer , "annotated43reply" );
449
+ assertThat (reply ).extracting (rec -> rec .value ()).isEqualTo ("FOO" );
450
+ consumer .close ();
441
451
}
442
452
443
453
@ Test
@@ -1375,8 +1385,8 @@ public MultiListenerNoDefault multiNoDefault() {
1375
1385
}
1376
1386
1377
1387
@ Bean
1378
- public MultiListenerSendTo multiListenerSendTo () {
1379
- return new MultiListenerSendTo ();
1388
+ public MultiListenerSendToImpl multiListenerSendTo () {
1389
+ return new MultiListenerSendToImpl ();
1380
1390
}
1381
1391
1382
1392
@ Bean
@@ -2290,6 +2300,10 @@ interface IfaceListener<T> {
2290
2300
2291
2301
void listen (T foo );
2292
2302
2303
+ @ SendTo ("annotated43reply" )
2304
+ @ KafkaListener (id = "ifcR" , topics = "annotated43" )
2305
+ String reply (String in );
2306
+
2293
2307
}
2294
2308
2295
2309
static class IfaceListenerImpl implements IfaceListener <String > {
@@ -2310,6 +2324,11 @@ public void listenTx(String foo) {
2310
2324
latch2 .countDown ();
2311
2325
}
2312
2326
2327
+ @ Override
2328
+ public String reply (String in ) {
2329
+ return in .toUpperCase ();
2330
+ }
2331
+
2313
2332
public CountDownLatch getLatch1 () {
2314
2333
return latch1 ;
2315
2334
}
@@ -2416,15 +2435,25 @@ public void bar(@Valid ValidatedClass val) {
2416
2435
2417
2436
@ KafkaListener (id = "multiSendTo" , topics = "annotated25" )
2418
2437
@ SendTo ("annotated25reply1" )
2419
- static class MultiListenerSendTo {
2438
+ interface MultiListenerSendTo {
2439
+
2440
+ @ KafkaHandler
2441
+ String foo (String in );
2420
2442
2421
2443
@ KafkaHandler
2444
+ @ SendTo ("!{'annotated25reply2'}" )
2445
+ String bar (KafkaNull nul , int key );
2446
+
2447
+ }
2448
+
2449
+ static class MultiListenerSendToImpl implements MultiListenerSendTo {
2450
+
2451
+ @ Override
2422
2452
public String foo (String in ) {
2423
2453
return in .toUpperCase ();
2424
2454
}
2425
2455
2426
- @ KafkaHandler
2427
- @ SendTo ("!{'annotated25reply2'}" )
2456
+ @ Override
2428
2457
public String bar (@ Payload (required = false ) KafkaNull nul ,
2429
2458
@ Header (KafkaHeaders .RECEIVED_MESSAGE_KEY ) int key ) {
2430
2459
return "BAR" ;
0 commit comments