30
30
import org .springframework .kafka .listener .MessageListener ;
31
31
import org .springframework .kafka .support .Acknowledgment ;
32
32
import org .springframework .messaging .Message ;
33
+ import org .springframework .messaging .converter .GenericMessageConverter ;
33
34
import org .springframework .messaging .converter .MessageConversionException ;
34
35
import org .springframework .messaging .converter .MessageConverter ;
35
- import org .springframework .messaging .converter .SimpleMessageConverter ;
36
36
import org .springframework .messaging .support .GenericMessage ;
37
37
import org .springframework .util .Assert ;
38
38
41
41
* converting received {@link ConsumerRecord} using specified {@link MessageConverter}
42
42
* and then passes result to specified {@link MessageListener}.
43
43
*
44
- * @param <T> the key type.
45
- * @param <U> the value type.
46
44
* @param <V> the desired value type after conversion.
47
45
*
48
46
* @author Adrian Chlebosz
49
47
* @since 3.0
50
48
* @see AcknowledgingConsumerAwareMessageListener
51
49
*/
52
- public class ConvertingMessageListener <T , U , V > implements AcknowledgingConsumerAwareMessageListener <T , U > {
50
+ @ SuppressWarnings ("rawtypes" )
51
+ public class ConvertingMessageListener <V > implements AcknowledgingConsumerAwareMessageListener <Object , Object > {
53
52
54
- private final MessageListener <T , V > delegate ;
55
- private final MessageConverter messageConverter ;
53
+ private final MessageListener delegate ;
56
54
private final Class <V > desiredValueType ;
57
55
56
+ private MessageConverter messageConverter ;
57
+
58
58
/**
59
59
* Construct an instance with the provided {@link MessageListener} and {@link Class}
60
60
* as a desired type of {@link ConsumerRecord}'s value after conversion. Default value of
61
- * {@link MessageConverter} is used, which is {@link SimpleMessageConverter }.
61
+ * {@link MessageConverter} is used, which is {@link GenericMessageConverter }.
62
62
*
63
- * @param delegate the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
63
+ * @param delegateMessageListener the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
64
64
* @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
65
65
*/
66
- public ConvertingMessageListener (MessageListener <T , V > delegate , Class <V > desiredValueType ) {
67
- Assert .notNull (delegate , "Delegate message listener cannot be null" );
68
- Assert .notNull (desiredValueType , "Desired value type cannot be null" );
69
-
70
- this .delegate = delegate ;
66
+ public ConvertingMessageListener (MessageListener <?, V > delegateMessageListener , Class <V > desiredValueType ) {
67
+ Assert .notNull (delegateMessageListener , "'delegateMessageListener' cannot be null" );
68
+ Assert .notNull (desiredValueType , "'desiredValueType' cannot be null" );
69
+ this .delegate = delegateMessageListener ;
71
70
this .desiredValueType = desiredValueType ;
72
71
73
- this .messageConverter = new SimpleMessageConverter ();
72
+ this .messageConverter = new GenericMessageConverter ();
74
73
}
75
74
76
75
/**
77
- * Construct an instance with the provided {@link MessageListener}, {@link MessageConverter} and {@link Class}
78
- * as a desired type of {@link ConsumerRecord}'s value after conversion.
79
- *
80
- * @param delegate the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
81
- * @param messageConverter the {@link MessageConverter} to use for conversion.
82
- * @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
76
+ * Set a {@link MessageConverter}.
77
+ * @param messageConverter the message converter to use for conversion of incoming {@link ConsumerRecord}.
78
+ * @since 3.0
83
79
*/
84
- public ConvertingMessageListener (MessageListener <T , V > delegate , MessageConverter messageConverter , Class <V > desiredValueType ) {
85
- Assert .notNull (delegate , "Delegate message listener cannot be null" );
86
- Assert .notNull (messageConverter , "Message converter cannot be null" );
87
- Assert .notNull (desiredValueType , "Desired value type cannot be null" );
88
-
89
- this .delegate = delegate ;
80
+ public void setMessageConverter (MessageConverter messageConverter ) {
81
+ Assert .notNull (messageConverter , "'messageConverter' cannot be null" );
90
82
this .messageConverter = messageConverter ;
91
- this .desiredValueType = desiredValueType ;
92
83
}
93
84
94
85
@ Override
95
- public void onMessage (ConsumerRecord <T , U > data , Acknowledgment acknowledgment , Consumer <?, ?> consumer ) { // NOSONAR
96
- ConsumerRecord <T , V > convertedConsumerRecord = convertConsumerRecord (data );
86
+ @ SuppressWarnings ("unchecked" )
87
+ public void onMessage (ConsumerRecord data , Acknowledgment acknowledgment , Consumer consumer ) {
88
+ ConsumerRecord convertedConsumerRecord = convertConsumerRecord (data );
97
89
if (this .delegate instanceof AcknowledgingConsumerAwareMessageListener ) {
98
90
this .delegate .onMessage (convertedConsumerRecord , acknowledgment , consumer );
99
91
}
@@ -107,13 +99,13 @@ else if (this.delegate instanceof AcknowledgingMessageListener) {
107
99
this .delegate .onMessage (convertedConsumerRecord );
108
100
}
109
101
110
- private ConsumerRecord < T , V > convertConsumerRecord (ConsumerRecord < T , U > data ) { // NOSONAR
102
+ private ConsumerRecord convertConsumerRecord (ConsumerRecord data ) {
111
103
Header [] headerArray = data .headers ().toArray ();
112
104
Map <String , Object > headerMap = Arrays .stream (headerArray )
113
105
.collect (Collectors .toMap (Header ::key , Header ::value ));
114
106
115
- Message < U > message = new GenericMessage <>(data .value (), headerMap );
116
- V converted = ( V ) this .messageConverter .fromMessage (message , this .desiredValueType );
107
+ Message message = new GenericMessage <>(data .value (), headerMap );
108
+ Object converted = this .messageConverter .fromMessage (message , this .desiredValueType );
117
109
118
110
if (converted == null ) {
119
111
throw new MessageConversionException (message , "Message cannot be converted by used MessageConverter" );
@@ -122,7 +114,7 @@ private ConsumerRecord<T, V> convertConsumerRecord(ConsumerRecord<T, U> data) {
122
114
return rebuildConsumerRecord (data , converted );
123
115
}
124
116
125
- private ConsumerRecord < T , V > rebuildConsumerRecord (ConsumerRecord < T , U > data , V converted ) {
117
+ private static ConsumerRecord rebuildConsumerRecord (ConsumerRecord data , Object converted ) {
126
118
return new ConsumerRecord <>(
127
119
data .topic (),
128
120
data .partition (),
0 commit comments