18
18
19
19
import java .util .Arrays ;
20
20
import java .util .Map ;
21
- import java .util .Objects ;
22
21
import java .util .stream .Collectors ;
23
22
24
23
import org .apache .kafka .clients .consumer .Consumer ;
35
34
import org .springframework .messaging .converter .MessageConverter ;
36
35
import org .springframework .messaging .converter .SimpleMessageConverter ;
37
36
import org .springframework .messaging .support .GenericMessage ;
37
+ import org .springframework .util .Assert ;
38
38
39
39
/**
40
40
* A {@link AcknowledgingConsumerAwareMessageListener} adapter that implements
49
49
* @since 3.0
50
50
* @see AcknowledgingConsumerAwareMessageListener
51
51
*/
52
- public class ConvertingAndDelegatingMessageListenerAdapter <T , U , V > implements AcknowledgingConsumerAwareMessageListener <T , U > {
52
+ public class ConvertingMessageListener <T , U , V > implements AcknowledgingConsumerAwareMessageListener <T , U > {
53
53
54
- private final Object delegate ;
54
+ private final MessageListener < T , V > delegate ;
55
55
private final MessageConverter messageConverter ;
56
56
private final Class <V > desiredValueType ;
57
57
@@ -63,9 +63,9 @@ public class ConvertingAndDelegatingMessageListenerAdapter<T, U, V> implements A
63
63
* @param delegate the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
64
64
* @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
65
65
*/
66
- public ConvertingAndDelegatingMessageListenerAdapter ( Object delegate , Class <V > desiredValueType ) {
67
- validateMessageListener (delegate );
68
- Objects . requireNonNull (desiredValueType );
66
+ public ConvertingMessageListener ( MessageListener < T , V > delegate , Class <V > desiredValueType ) {
67
+ Assert . notNull (delegate , "Delegate message listener cannot be null" );
68
+ Assert . notNull (desiredValueType , "Desired value type cannot be null" );
69
69
70
70
this .delegate = delegate ;
71
71
this .desiredValueType = desiredValueType ;
@@ -81,38 +81,30 @@ public ConvertingAndDelegatingMessageListenerAdapter(Object delegate, Class<V> d
81
81
* @param messageConverter the {@link MessageConverter} to use for conversion.
82
82
* @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
83
83
*/
84
- public ConvertingAndDelegatingMessageListenerAdapter ( Object delegate , MessageConverter messageConverter , Class <V > desiredValueType ) {
85
- validateMessageListener (delegate );
86
- Objects . requireNonNull (messageConverter );
87
- Objects . requireNonNull (desiredValueType );
84
+ public ConvertingMessageListener ( MessageListener < T , V > delegate , MessageConverter messageConverter , Class <V > desiredValueType ) {
85
+ Assert . notNull (delegate , "Delegate message listener cannot be null" );
86
+ Assert . notNull (messageConverter , "Message converter cannot be null" );
87
+ Assert . notNull (desiredValueType , "Desired value type cannot be null" );
88
88
89
89
this .delegate = delegate ;
90
90
this .messageConverter = messageConverter ;
91
91
this .desiredValueType = desiredValueType ;
92
92
}
93
93
94
- private void validateMessageListener (Object messageListener ) {
95
- Objects .requireNonNull (messageListener );
96
- if (!(messageListener instanceof MessageListener )) {
97
- throw new IllegalArgumentException ("Passed message listener must be of MessageListener type" );
98
- }
99
- }
100
-
101
94
@ Override
102
95
public void onMessage (ConsumerRecord <T , U > data , Acknowledgment acknowledgment , Consumer <?, ?> consumer ) { // NOSONAR
103
96
ConsumerRecord <T , V > convertedConsumerRecord = convertConsumerRecord (data );
104
97
if (this .delegate instanceof AcknowledgingConsumerAwareMessageListener ) {
105
- (( AcknowledgingConsumerAwareMessageListener < T , V >) this .delegate ) .onMessage (convertedConsumerRecord , acknowledgment , consumer );
98
+ this .delegate .onMessage (convertedConsumerRecord , acknowledgment , consumer );
106
99
}
107
100
else if (this .delegate instanceof ConsumerAwareMessageListener ) {
108
- (( ConsumerAwareMessageListener < T , V >) this .delegate ) .onMessage (convertedConsumerRecord , consumer );
101
+ this .delegate .onMessage (convertedConsumerRecord , consumer );
109
102
}
110
103
else if (this .delegate instanceof AcknowledgingMessageListener ) {
111
- ((AcknowledgingMessageListener <T , V >) this .delegate ).onMessage (convertedConsumerRecord , acknowledgment );
112
- }
113
- else if (this .delegate instanceof MessageListener ) {
114
- ((MessageListener <T , V >) this .delegate ).onMessage (convertedConsumerRecord );
104
+ this .delegate .onMessage (convertedConsumerRecord , acknowledgment );
115
105
}
106
+
107
+ this .delegate .onMessage (convertedConsumerRecord );
116
108
}
117
109
118
110
private ConsumerRecord <T , V > convertConsumerRecord (ConsumerRecord <T , U > data ) { // NOSONAR
0 commit comments