Skip to content

Commit 8034c2c

Browse files
breader124Adrian Chlebosz
and
Adrian Chlebosz
authored
GH-2068: Add lightweight converting adapter (#2165)
* GH-2068: Add lightweight converting adapter * GH-2068: Introduce amendments after code review * GH-2068: Introduce further amendments after code review * GH-2068: Extend content of copied record by more metadata and fix doubled invokation of message listener * GH-2068: Add possibility to specify header mapper and cover it with tests Co-authored-by: Adrian Chlebosz <[email protected]>
1 parent c9b74e1 commit 8034c2c

File tree

2 files changed

+360
-0
lines changed

2 files changed

+360
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.clients.consumer.Consumer;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
25+
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
26+
import org.springframework.kafka.listener.AcknowledgingMessageListener;
27+
import org.springframework.kafka.listener.ConsumerAwareMessageListener;
28+
import org.springframework.kafka.listener.DelegatingMessageListener;
29+
import org.springframework.kafka.listener.MessageListener;
30+
import org.springframework.kafka.support.Acknowledgment;
31+
import org.springframework.kafka.support.KafkaHeaderMapper;
32+
import org.springframework.messaging.Message;
33+
import org.springframework.messaging.converter.GenericMessageConverter;
34+
import org.springframework.messaging.converter.MessageConversionException;
35+
import org.springframework.messaging.converter.MessageConverter;
36+
import org.springframework.messaging.support.GenericMessage;
37+
import org.springframework.util.Assert;
38+
39+
/**
40+
* A {@link AcknowledgingConsumerAwareMessageListener} adapter that implements
41+
* converting received {@link ConsumerRecord} using specified {@link MessageConverter}
42+
* and then passes result to specified {@link MessageListener}. If directly set, also headers
43+
* can be mapped with implementation of {@link KafkaHeaderMapper} and then passed to converter
44+
* as a part of message being actually processed. Otherwise, if header mapper is not specified,
45+
* headers will not be accessible from converter's perspective.
46+
*
47+
* @param <V> the desired value type after conversion.
48+
*
49+
* @author Adrian Chlebosz
50+
* @since 3.0
51+
* @see DelegatingMessageListener
52+
* @see AcknowledgingConsumerAwareMessageListener
53+
*/
54+
@SuppressWarnings("rawtypes")
55+
public class ConvertingMessageListener<V> implements DelegatingMessageListener<MessageListener>, AcknowledgingConsumerAwareMessageListener<Object, Object> {
56+
57+
private final MessageListener delegate;
58+
private final Class<V> desiredValueType;
59+
60+
private MessageConverter messageConverter;
61+
private KafkaHeaderMapper headerMapper;
62+
63+
/**
64+
* Construct an instance with the provided {@link MessageListener} and {@link Class}
65+
* as a desired type of {@link ConsumerRecord}'s value after conversion. Default value of
66+
* {@link MessageConverter} is used, which is {@link GenericMessageConverter}.
67+
*
68+
* @param delegateMessageListener the {@link MessageListener} to use when passing converted {@link ConsumerRecord} further.
69+
* @param desiredValueType the {@link Class} setting desired type of {@link ConsumerRecord}'s value.
70+
*/
71+
public ConvertingMessageListener(MessageListener<?, V> delegateMessageListener, Class<V> desiredValueType) {
72+
Assert.notNull(delegateMessageListener, "'delegateMessageListener' cannot be null");
73+
Assert.notNull(desiredValueType, "'desiredValueType' cannot be null");
74+
this.delegate = delegateMessageListener;
75+
this.desiredValueType = desiredValueType;
76+
77+
this.messageConverter = new GenericMessageConverter();
78+
}
79+
80+
/**
81+
* Set a {@link MessageConverter}.
82+
* @param messageConverter the message converter to use for conversion of incoming {@link ConsumerRecord}.
83+
* @since 3.0
84+
*/
85+
public void setMessageConverter(MessageConverter messageConverter) {
86+
Assert.notNull(messageConverter, "'messageConverter' cannot be null");
87+
this.messageConverter = messageConverter;
88+
}
89+
90+
/**
91+
* Set a {@link KafkaHeaderMapper}.
92+
* @param headerMapper the header mapper to use for mapping headers of incoming {@link ConsumerRecord}.
93+
* @since 3.0
94+
*/
95+
public void setKafkaHeaderMapper(KafkaHeaderMapper headerMapper) {
96+
Assert.notNull(headerMapper, "'headerMapper' cannot be null");
97+
this.headerMapper = headerMapper;
98+
}
99+
100+
@Override
101+
public MessageListener getDelegate() {
102+
return this.delegate;
103+
}
104+
105+
@Override
106+
@SuppressWarnings("unchecked")
107+
public void onMessage(ConsumerRecord receivedRecord, Acknowledgment acknowledgment, Consumer consumer) {
108+
ConsumerRecord convertedConsumerRecord = convertConsumerRecord(receivedRecord);
109+
if (this.delegate instanceof AcknowledgingConsumerAwareMessageListener) {
110+
this.delegate.onMessage(convertedConsumerRecord, acknowledgment, consumer);
111+
}
112+
else if (this.delegate instanceof ConsumerAwareMessageListener) {
113+
this.delegate.onMessage(convertedConsumerRecord, consumer);
114+
}
115+
else if (this.delegate instanceof AcknowledgingMessageListener) {
116+
this.delegate.onMessage(convertedConsumerRecord, acknowledgment);
117+
}
118+
else {
119+
this.delegate.onMessage(convertedConsumerRecord);
120+
}
121+
}
122+
123+
private ConsumerRecord convertConsumerRecord(ConsumerRecord receivedRecord) {
124+
Map<String, Object> headerMap = null;
125+
if (this.headerMapper != null) {
126+
headerMap = new HashMap<>();
127+
this.headerMapper.toHeaders(receivedRecord.headers(), headerMap);
128+
}
129+
130+
Message message = new GenericMessage<>(receivedRecord.value(), headerMap);
131+
Object convertedPayload = this.messageConverter.fromMessage(message, this.desiredValueType);
132+
if (convertedPayload == null) {
133+
throw new MessageConversionException(message, "Message cannot be converted by used MessageConverter");
134+
}
135+
136+
return rebuildConsumerRecord(receivedRecord, convertedPayload);
137+
}
138+
139+
@SuppressWarnings("unchecked")
140+
private static ConsumerRecord rebuildConsumerRecord(ConsumerRecord receivedRecord, Object convertedPayload) {
141+
return new ConsumerRecord(
142+
receivedRecord.topic(),
143+
receivedRecord.partition(),
144+
receivedRecord.offset(),
145+
receivedRecord.timestamp(),
146+
receivedRecord.timestampType(),
147+
receivedRecord.serializedKeySize(),
148+
receivedRecord.serializedValueSize(),
149+
receivedRecord.key(),
150+
convertedPayload,
151+
receivedRecord.headers(),
152+
receivedRecord.leaderEpoch()
153+
);
154+
}
155+
156+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright 2016-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.times;
24+
import static org.mockito.Mockito.verify;
25+
26+
import org.apache.kafka.clients.consumer.Consumer;
27+
import org.apache.kafka.clients.consumer.ConsumerRecord;
28+
import org.apache.kafka.common.header.internals.RecordHeader;
29+
import org.junit.jupiter.api.Test;
30+
31+
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
32+
import org.springframework.kafka.listener.MessageListener;
33+
import org.springframework.kafka.support.Acknowledgment;
34+
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
35+
import org.springframework.messaging.Message;
36+
import org.springframework.messaging.MessageHeaders;
37+
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
38+
import org.springframework.messaging.converter.MessageConversionException;
39+
import org.springframework.messaging.converter.MessageConverter;
40+
41+
import com.fasterxml.jackson.core.JsonProcessingException;
42+
import com.fasterxml.jackson.databind.ObjectMapper;
43+
44+
/**
45+
* @author Adrian Chlebosz
46+
* @since 3.0.0
47+
*
48+
*/
49+
class ConvertingMessageListenerTests {
50+
51+
private final ObjectMapper mapper = new ObjectMapper();
52+
53+
@Test
54+
public void testMessageListenerIsInvokedWithConvertedSimpleRecord() {
55+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", 0);
56+
57+
var delegateListener = (MessageListener<String, Long>) (data) -> assertThat(data.value()).isNotNull();
58+
var convertingMessageListener = new ConvertingMessageListener<>(
59+
delegateListener,
60+
Long.class
61+
);
62+
63+
convertingMessageListener.onMessage(consumerRecord, null, null);
64+
}
65+
66+
@Test
67+
public void testMessageListenerIsInvokedWithRecordConvertedByCustomConverter() throws JsonProcessingException {
68+
var toBeConverted = new ToBeConverted("foo");
69+
var toBeConvertedJson = mapper.writeValueAsString(toBeConverted);
70+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", toBeConvertedJson);
71+
72+
var delegateListener = (MessageListener<String, ToBeConverted>) (data) -> {
73+
assertThat(data.value()).isNotNull();
74+
assertThat(data.value().getA()).isEqualTo("foo");
75+
};
76+
var convertingMessageListener = new ConvertingMessageListener<>(
77+
delegateListener,
78+
ToBeConverted.class
79+
);
80+
convertingMessageListener.setMessageConverter(new MappingJackson2MessageConverter());
81+
82+
convertingMessageListener.onMessage(consumerRecord, null, null);
83+
}
84+
85+
@SuppressWarnings({ "unchecked", "rawtypes" })
86+
@Test
87+
public void testMessageListenerIsInvokedOnlyOnce() {
88+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", 0);
89+
90+
var delegateListener = mock(AcknowledgingConsumerAwareMessageListener.class);
91+
var convertingMessageListener = new ConvertingMessageListener<>(
92+
delegateListener,
93+
Long.class
94+
);
95+
convertingMessageListener.setMessageConverter(new MappingJackson2MessageConverter());
96+
97+
convertingMessageListener.onMessage(consumerRecord, null, null);
98+
99+
verify(delegateListener, times(0)).onMessage(any());
100+
verify(delegateListener, times(0)).onMessage(any(), any(Acknowledgment.class));
101+
verify(delegateListener, times(0)).onMessage(any(), any(Consumer.class));
102+
verify(delegateListener, times(1)).onMessage(any(), any(), any());
103+
}
104+
105+
@Test
106+
public void testConversionFailsWhileUsingDefaultConverterForComplexObject() throws JsonProcessingException {
107+
var toBeConverted = new ToBeConverted("foo");
108+
var toBeConvertedJson = mapper.writeValueAsString(toBeConverted);
109+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", toBeConvertedJson);
110+
111+
var delegateListener = (MessageListener<String, ToBeConverted>) (data) -> {
112+
assertThat(data.value()).isNotNull();
113+
assertThat(data.value().getA()).isEqualTo("foo");
114+
};
115+
var convertingMessageListener = new ConvertingMessageListener<>(
116+
delegateListener,
117+
ToBeConverted.class
118+
);
119+
120+
assertThatThrownBy(
121+
() -> convertingMessageListener.onMessage(consumerRecord, null, null)
122+
).isInstanceOf(MessageConversionException.class);
123+
}
124+
125+
@Test
126+
public void testHeadersAreAccessibleDuringConversionWhenHeaderMapperIsSpecified() {
127+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", 0);
128+
var header = new RecordHeader("headerKey", "headerValue".getBytes());
129+
consumerRecord.headers().add(header);
130+
131+
var delegateListener = (MessageListener<String, Long>) (data) -> { };
132+
var messageConverter = new MessageConverter() {
133+
@Override
134+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
135+
var headers = message.getHeaders();
136+
assertThat(headers.containsKey("headerKey")).isTrue();
137+
assertThat(headers.get("headerKey", byte[].class)).isEqualTo(header.value());
138+
return 0L;
139+
}
140+
141+
@Override
142+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
143+
return null;
144+
}
145+
};
146+
var convertingMessageListener = new ConvertingMessageListener<>(
147+
delegateListener,
148+
Long.class
149+
);
150+
convertingMessageListener.setMessageConverter(messageConverter);
151+
convertingMessageListener.setKafkaHeaderMapper(new SimpleKafkaHeaderMapper());
152+
153+
convertingMessageListener.onMessage(consumerRecord, null, null);
154+
}
155+
156+
@Test
157+
public void testHeadersAreInaccessibleDuringConversionWhenHeaderMapperIsNotSpecified() {
158+
var consumerRecord = new ConsumerRecord<>("foo", 0, 0, "key", 0);
159+
var header = new RecordHeader("headerKey", "headerValue".getBytes());
160+
consumerRecord.headers().add(header);
161+
162+
var delegateListener = (MessageListener<String, Long>) (data) -> { };
163+
var messageConverter = new MessageConverter() {
164+
@Override
165+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
166+
var headers = message.getHeaders();
167+
assertThat(headers.containsKey("headerKey")).isFalse();
168+
return 0L;
169+
}
170+
171+
@Override
172+
public Message<?> toMessage(Object payload, MessageHeaders headers) {
173+
return null;
174+
}
175+
};
176+
var convertingMessageListener = new ConvertingMessageListener<>(
177+
delegateListener,
178+
Long.class
179+
);
180+
convertingMessageListener.setMessageConverter(messageConverter);
181+
182+
convertingMessageListener.onMessage(consumerRecord, null, null);
183+
}
184+
185+
private static class ToBeConverted {
186+
private String a;
187+
188+
ToBeConverted() {
189+
}
190+
191+
ToBeConverted(String a) {
192+
this.a = a;
193+
}
194+
195+
public String getA() {
196+
return a;
197+
}
198+
199+
public void setA(String a) {
200+
this.a = a;
201+
}
202+
}
203+
204+
}

0 commit comments

Comments
 (0)