diff --git a/build.gradle b/build.gradle index fffcde2899a..7a1243823dd 100644 --- a/build.gradle +++ b/build.gradle @@ -51,6 +51,7 @@ ext { assertjVersion = '3.15.0' assertkVersion = '0.22' awaitilityVersion = '4.0.2' + cloudEventsVersion = '1.3.0' commonsDbcp2Version = '2.7.0' commonsIoVersion = '2.6' commonsNetVersion = '3.6' @@ -419,6 +420,7 @@ project('spring-integration-core') { optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion" optionalApi "org.apache.avro:avro:$avroVersion" optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' + optionalApi "io.cloudevents:cloudevents-api:$cloudEventsVersion" testImplementation ("org.aspectj:aspectjweaver:$aspectjVersion") testImplementation ('com.fasterxml.jackson.datatype:jackson-datatype-jsr310') diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/CloudEventHeaders.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/CloudEventHeaders.java new file mode 100644 index 00000000000..3add90b93ff --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/CloudEventHeaders.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +/** + * Message headers for basic cloud event attributes. + * These headers might be remapped to respective attributes/headers + * in the target protocol binder. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public final class CloudEventHeaders { + + private CloudEventHeaders() { + } + + /** + * Header prefix as a {@value PREFIX} for cloud event attributes. + */ + public static final String PREFIX = "ce_"; + + /** + * The header name for cloud event {@code id} attribute. + */ + public static final String ID = PREFIX + "id"; + + /** + * The header name for cloud event {@code source} attribute. + */ + public static final String SOURCE = PREFIX + "source"; + + /** + * The header name for cloud event {@code specversion} attribute. + */ + public static final String SPEC_VERSION = PREFIX + "specversion"; + + /** + * The header name for cloud event {@code type} attribute. + */ + public static final String TYPE = PREFIX + "type"; + + /** + * The header name for cloud event {@code datacontenttype} attribute. + */ + public static final String DATA_CONTENT_TYPE = PREFIX + "datacontenttype"; + + /** + * The header name for cloud event {@code dataschema} attribute. + */ + public static final String DATA_SCHEMA = PREFIX + "dataschema"; + + /** + * The header name for cloud event {@code subject} attribute. + */ + public static final String SUBJECT = PREFIX + "subject"; + + /** + * The header name for cloud event {@code time} attribute. + */ + public static final String TIME = PREFIX + "time"; + + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java new file mode 100644 index 00000000000..75d9dac01a1 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.CharSequenceEncoder; +import org.springframework.core.codec.Encoder; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; + +import io.cloudevents.fun.DataMarshaller; +import io.cloudevents.json.Json; + +/** + * A {@link DataMarshaller} implementation for delegating + * to the provided {@link Encoder}s according a {@link MessageHeaders#CONTENT_TYPE} + * header value. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class ContentTypeDelegatingDataMarshaller implements DataMarshaller { + + private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); + + private final List> encoders = new ArrayList<>(); + + public ContentTypeDelegatingDataMarshaller(Encoder... encoders) { + this.encoders.add(CharSequenceEncoder.allMimeTypes()); + setEncoders(encoders); + } + + public final void setEncoders(Encoder... encoders) { + Assert.notNull(encoders, "'encoders' must not be null"); + Assert.noNullElements(encoders, "'encoders' must not contain null elements"); + this.encoders.addAll(Arrays.asList(encoders)); + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public byte[] marshal(Object data, Map headers) throws RuntimeException { + String contentType = headers.get(MessageHeaders.CONTENT_TYPE); + if (contentType == null) { // Assume JSON by default + return Json.binaryMarshal(data, headers); + } + else { + ResolvableType elementType = ResolvableType.forClass(data.getClass()); + MimeType mimeType = MimeType.valueOf(contentType); + Encoder encoder = encoder(elementType, mimeType); + DataBuffer dataBuffer = + encoder.encodeValue(data, this.dataBufferFactory, elementType, + mimeType, (Map) (Map) headers); + + ByteBuffer buf = dataBuffer.asByteBuffer(); + byte[] result = new byte[buf.remaining()]; + buf.get(result); + return result; + } + } + + @SuppressWarnings("unchecked") + private Encoder encoder(ResolvableType elementType, MimeType mimeType) { + for (Encoder encoder : this.encoders) { + if (encoder.canEncode(elementType, mimeType)) { + return (Encoder) encoder; + } + } + throw new IllegalArgumentException("No encoder for " + elementType); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java new file mode 100644 index 00000000000..84f8d6386d4 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +import java.util.AbstractMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; + +import io.cloudevents.v1.ContextAttributes; + +/** + * A Cloud Event header mapper. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public final class HeaderMapper { + + /** + * Following the signature of {@link io.cloudevents.fun.FormatHeaderMapper} + * @param attributes The map of attributes + * @param extensions The map of extensions + * @return The map of headers + */ + public static Map map(Map attributes, Map extensions) { + Assert.notNull(attributes, "'attributes' must not be null"); + Assert.notNull(extensions, "'extensions' must not be null"); + + Map result = + attributes.entrySet() + .stream() + .filter(attribute -> + attribute.getValue() != null + && !ContextAttributes.datacontenttype.name().equals(attribute.getKey())) + .map(header -> + new AbstractMap.SimpleEntry<>( + CloudEventHeaders.PREFIX + header.getKey().toLowerCase(Locale.US), + header.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + result.putAll( + extensions.entrySet() + .stream() + .filter(extension -> extension.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); + + Optional.ofNullable(attributes + .get(ContextAttributes.datacontenttype.name())) + .ifPresent((dataContentType) -> { + result.put(MessageHeaders.CONTENT_TYPE, dataContentType); + }); + + return result; + } + + private HeaderMapper() { + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java new file mode 100644 index 00000000000..2600c0607ea --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.messaging.MessageHeaders; + +import io.cloudevents.extensions.ExtensionFormat; +import io.cloudevents.format.BinaryMarshaller; +import io.cloudevents.format.StructuredMarshaller; +import io.cloudevents.format.Wire; +import io.cloudevents.format.builder.EventStep; +import io.cloudevents.fun.DataMarshaller; +import io.cloudevents.json.Json; +import io.cloudevents.v1.Accessor; +import io.cloudevents.v1.AttributesImpl; + +/** + * A Cloud Events general purpose marshallers factory. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public final class Marshallers { + + private static final Map NO_HEADERS = new HashMap<>(); + + /** + * Builds a Binary Content Mode marshaller to marshal cloud events as JSON for + * any Transport Binding. + * @param The data type + * @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON + * @see BinaryMarshaller + */ + public static EventStep binary() { + return binary(Json::binaryMarshal); + } + + /** + * Builds a Binary Content Mode marshaller to marshal cloud events as a {@code byte[]} for + * any Transport Binding. + * The data marshalling is based on the provided {@link DataMarshaller}. + * @param marshaller the {@link DataMarshaller} for cloud event payload. + * @param The data type + * @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON + * @see BinaryMarshaller + */ + public static EventStep binary( + DataMarshaller marshaller) { + + return BinaryMarshaller.builder() + .map(AttributesImpl::marshal) + .map(Accessor::extensionsOf) + .map(ExtensionFormat::marshal) + .map(HeaderMapper::map) + .map(marshaller) + .builder(Wire::new); + } + + /** + * Builds a Structured Content Mode marshaller to marshal cloud event as JSON for + * any Transport Binding. + * @param The data type + * @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON + * @see StructuredMarshaller + */ + public static EventStep structured() { + return StructuredMarshaller. + builder() + .mime(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json") + .map((event) -> Json.binaryMarshal(event, NO_HEADERS)) + .skip(); + } + + private Marshallers() { + + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java new file mode 100644 index 00000000000..83a06a78fcd --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes to support for Cloud Events. + */ +package org.springframework.integration.support.cloudevents; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java new file mode 100644 index 00000000000..82f9c4e233b --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java @@ -0,0 +1,218 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.transformer; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.UUID; + +import org.springframework.core.codec.Encoder; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.support.cloudevents.ContentTypeDelegatingDataMarshaller; +import org.springframework.integration.support.cloudevents.Marshallers; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; + +import io.cloudevents.CloudEvent; +import io.cloudevents.extensions.ExtensionFormat; +import io.cloudevents.format.Wire; +import io.cloudevents.format.builder.EventStep; +import io.cloudevents.v1.AttributesImpl; +import io.cloudevents.v1.CloudEventBuilder; +import io.cloudevents.v1.CloudEventImpl; + +/** + * An {@link AbstractTransformer} implementation to build a cloud event + * from the request message. + *

+ * This transformer may produce a message according a {@link ToCloudEventTransformer.Result} option. + * By default it is a {@link ToCloudEventTransformer.Result#RAW} + * with the meaning to produce a {@link io.cloudevents.CloudEvent} + * instance as a reply message payload. + *

+ * A {@link ToCloudEventTransformer.Result#BINARY} mode produces a marshalled into a {@code byte[]} + * a built {@link io.cloudevents.CloudEvent} body and respective cloud event headers. + *

+ * A {@link ToCloudEventTransformer.Result#STRUCTURED} mode produces a marshalled into a {@code byte[]} + * a whole {@link io.cloudevents.CloudEvent} and respective content type header + * with the {@code "application/cloudevents+json"} value. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class ToCloudEventTransformer extends AbstractTransformer { + + public enum Result { + + RAW, BINARY, STRUCTURED + + } + + private final URI source; + + private final ContentTypeDelegatingDataMarshaller dataMarshaller = new ContentTypeDelegatingDataMarshaller(); + + @Nullable + private final EventStep wireBuilder; + + private Expression typeExpression = + new FunctionExpression>((message) -> message.getPayload().getClass().getName()); + + @Nullable + private Expression subjectExpression; + + @Nullable + private Expression dataSchemaExpression; + + @Nullable + private Expression extensionExpression; + + private EvaluationContext evaluationContext; + + public ToCloudEventTransformer(URI source) { + this(source, Result.RAW); + } + + public ToCloudEventTransformer(URI source, Result resultMode) { + Assert.notNull(source, "'source' must not be null"); + Assert.notNull(resultMode, "'resultMode' must not be null"); + this.source = source; + switch (resultMode) { + case BINARY: + this.wireBuilder = Marshallers.binary(this.dataMarshaller); + break; + case STRUCTURED: + this.wireBuilder = Marshallers.structured(); + break; + default: + this.wireBuilder = null; + } + } + + public void setTypeExpression(Expression typeExpression) { + Assert.notNull(typeExpression, "'typeExpression' must not be null"); + this.typeExpression = typeExpression; + } + + public void setSubjectExpression(@Nullable Expression subjectExpression) { + this.subjectExpression = subjectExpression; + } + + public void setDataSchemaExpression(@Nullable Expression dataSchemaExpression) { + this.dataSchemaExpression = dataSchemaExpression; + } + + public void setExtensionExpression(@Nullable Expression extensionExpression) { + this.extensionExpression = extensionExpression; + } + + /** + * Configure a set of {@link Encoder}s for content type based data marshalling. + * They are used only for the the {@link Result#BINARY} mode and when inbound payload + * is not a {@code byte[]} already. + * Plus {@link MessageHeaders#CONTENT_TYPE} must be present in the request message. + * @param encoders the {@link Encoder}s to use. + */ + public final void setEncoders(Encoder... encoders) { + this.dataMarshaller.setEncoders(encoders); + } + + @Override + protected void onInit() { + super.onInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + + @Override + protected Object doTransform(Message message) { + CloudEventImpl cloudEvent = buildCloudEvent(message); + + if (this.wireBuilder != null) { + Wire wire = + this.wireBuilder.withEvent(() -> cloudEvent) + .marshal(); + + return getMessageBuilderFactory() + .withPayload(wire.getPayload().orElse(cloudEvent.getDataBase64())) + .copyHeaders(wire.getHeaders()) + .copyHeadersIfAbsent(message.getHeaders()) + .build(); + } + else { + return cloudEvent; + } + } + + @SuppressWarnings("unchecked") + private CloudEventImpl buildCloudEvent(Message message) { + MessageHeaders headers = message.getHeaders(); + Object payload = message.getPayload(); + + CloudEventBuilder cloudEventBuilder = + payload instanceof CloudEvent + ? CloudEventBuilder.builder((CloudEvent) payload) + : CloudEventBuilder.builder(); + + cloudEventBuilder.withId(headers.getId() != null + ? headers.getId().toString() + : UUID.randomUUID().toString()) + .withTime(ZonedDateTime.now()) + .withSource(this.source) + .withType(this.typeExpression.getValue(this.evaluationContext, message, String.class)); + + if (!(payload instanceof CloudEvent)) { + if (payload instanceof byte[]) { + cloudEventBuilder.withDataBase64((byte[]) payload); + } + else { + cloudEventBuilder.withData(payload); + } + } + + MimeType contentType = StaticMessageHeaderAccessor.getContentType(message); + + if (contentType != null) { + cloudEventBuilder.withDataContentType(contentType.toString()); + } + + if (this.subjectExpression != null) { + cloudEventBuilder.withSubject( + this.subjectExpression.getValue(this.evaluationContext, message, String.class)); + } + + if (this.dataSchemaExpression != null) { + cloudEventBuilder.withDataschema( + this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class)); + } + + if (this.extensionExpression != null) { + cloudEventBuilder.withExtension( + this.extensionExpression.getValue(this.evaluationContext, message, ExtensionFormat.class)); + } + + return cloudEventBuilder.build(); + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java new file mode 100644 index 00000000000..26758d3eae5 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java @@ -0,0 +1,127 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.transformer; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; + +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.json.JsonPathUtils; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.cloudevents.CloudEventHeaders; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.MimeTypeUtils; + +import io.cloudevents.CloudEvent; + +/** + * @author Artem Bilan + * + * @since 5.3 + */ +public class ToCloudEventTransformerTests { + + private static final ConfigurableApplicationContext APPLICATION_CONTEXT = TestUtils.createTestApplicationContext(); + + private static final URI SOURCE = URI.create("https://spring.io/projects/spring-integration"); + + @AfterAll + static void teardown() { + APPLICATION_CONTEXT.close(); + } + + @Test + void testDefaultTransformer() { + ToCloudEventTransformer transformer = new ToCloudEventTransformer(SOURCE); + Message result = transformer.transform(new GenericMessage<>("test")); + assertThat(result.getHeaders()).containsOnlyKeys(MessageHeaders.ID, MessageHeaders.TIMESTAMP); + assertThat(result.getPayload()) + .asInstanceOf(InstanceOfAssertFactories.type(CloudEvent.class)) + .satisfies(event -> { + assertThat(event.getData().get()).isEqualTo("test"); + assertThat(event.getAttributes().getSource()).isEqualTo(SOURCE); + assertThat(event.getAttributes().getSpecversion()).isEqualTo("1.0"); + assertThat(event.getAttributes().getType()).isEqualTo(String.class.getName()); + assertThat(event.getAttributes().getMediaType().isPresent()).isFalse(); + } + ); + } + + @Test + void testBinary() { + ToCloudEventTransformer transformer = + new ToCloudEventTransformer(SOURCE, ToCloudEventTransformer.Result.BINARY); + transformer.setSubjectExpression(new LiteralExpression("some_subject")); + Message message = + MessageBuilder.withPayload("test") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build(); + Message result = transformer.transform(message); + assertThat(result.getHeaders()) + .containsEntry(CloudEventHeaders.TYPE, String.class.getName()) + .containsEntry(CloudEventHeaders.SOURCE, SOURCE.toString()) + .containsEntry(CloudEventHeaders.ID, message.getHeaders().getId().toString()) + .containsEntry(CloudEventHeaders.SUBJECT, "some_subject") + .containsEntry(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE) + .containsKeys(CloudEventHeaders.TIME, CloudEventHeaders.SPEC_VERSION) + .doesNotContainKeys( + CloudEventHeaders.DATA_CONTENT_TYPE, + "ce_content_type"); + assertThat(result.getPayload()) + .isInstanceOf(byte[].class) + .isEqualTo("test".getBytes()); + } + + @Test + void testStructured() throws IOException { + ToCloudEventTransformer transformer = + new ToCloudEventTransformer(SOURCE, ToCloudEventTransformer.Result.STRUCTURED); + GenericMessage message = new GenericMessage<>("test"); + Message result = transformer.transform(message); + assertThat(result.getHeaders()) + .containsEntry(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json") + .doesNotContainKeys( + CloudEventHeaders.ID, + CloudEventHeaders.SOURCE, + CloudEventHeaders.DATA_CONTENT_TYPE, + CloudEventHeaders.TIME, + CloudEventHeaders.SPEC_VERSION); + Object payload = result.getPayload(); + assertThat(payload).isInstanceOf(byte[].class); + + List jsonPath = JsonPathUtils.evaluate(payload, "$..data"); + assertThat(jsonPath.get(0)).isEqualTo("test"); + + jsonPath = JsonPathUtils.evaluate(payload, "$..source"); + assertThat(jsonPath.get(0)).isEqualTo(SOURCE.toString()); + + jsonPath = JsonPathUtils.evaluate(payload, "$..type"); + assertThat(jsonPath.get(0)).isEqualTo(String.class.getName()); + } + +}