From 962ba1af83170410b6721e981521b5c3a203c318 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 9 Apr 2020 18:14:24 -0400 Subject: [PATCH 1/4] GH-3103: Introduce CloudEvents transformers Fixes https://github.com/spring-projects/spring-integration/issues/3103 * Add an `io.cloudevents:cloudevents-api` optional dependency * Introduce a `HeaderMapper` and `Marshallers` in the `support.cloudevents` to marshal `CloudEvent` instances * Introduce a `ToCloudEventTransformer` to build a `CloudEvent` instance from a `Message` and optional marshaling logic if necessary. Such a transformer could be used as a general purpose CE protocol binder before sending a result message into the target protocol channel adapter --- build.gradle | 2 + .../support/cloudevents/HeaderMapper.java | 85 ++++++++ .../support/cloudevents/Marshallers.java | 80 +++++++ .../support/cloudevents/package-info.java | 4 + .../transformer/ToCloudEventTransformer.java | 203 ++++++++++++++++++ .../ToCloudEventTransformerTests.java | 114 ++++++++++ 6 files changed, 488 insertions(+) create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java create mode 100644 spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java diff --git a/build.gradle b/build.gradle index fffcde2899a..7a1243823dd 100644 --- a/build.gradle +++ b/build.gradle @@ -51,6 +51,7 @@ ext { assertjVersion = '3.15.0' assertkVersion = '0.22' awaitilityVersion = '4.0.2' + cloudEventsVersion = '1.3.0' commonsDbcp2Version = '2.7.0' commonsIoVersion = '2.6' commonsNetVersion = '3.6' @@ -419,6 +420,7 @@ project('spring-integration-core') { optionalApi "io.github.resilience4j:resilience4j-ratelimiter:$resilience4jVersion" optionalApi "org.apache.avro:avro:$avroVersion" optionalApi 'org.jetbrains.kotlin:kotlin-stdlib-jdk8' + optionalApi "io.cloudevents:cloudevents-api:$cloudEventsVersion" testImplementation ("org.aspectj:aspectjweaver:$aspectjVersion") testImplementation ('com.fasterxml.jackson.datatype:jackson-datatype-jsr310') diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java new file mode 100644 index 00000000000..155425c4c9a --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +import java.util.AbstractMap; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; + +import io.cloudevents.v1.ContextAttributes; + +/** + * A Cloud Event header mapper. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class HeaderMapper { + + /** + * Cloud event headers prefix as a {@value HEADER_PREFIX}. + */ + public static final String HEADER_PREFIX = "ce_"; + + /** + * Following the signature of {@link io.cloudevents.fun.FormatHeaderMapper} + * @param attributes The map of attributes + * @param extensions The map of extensions + * @return The map of headers + */ + public static Map map(Map attributes, Map extensions) { + Assert.notNull(attributes, "'attributes' must noy be null"); + Assert.notNull(extensions, "'extensions' must noy be null"); + + Map result = + attributes.entrySet() + .stream() + .filter(attribute -> + attribute.getValue() != null + && !ContextAttributes.datacontenttype.name().equals(attribute.getKey())) + .map(header -> + new AbstractMap.SimpleEntry<>( + HEADER_PREFIX + header.getKey().toLowerCase(Locale.US), + header.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + result.putAll( + extensions.entrySet() + .stream() + .filter(extension -> extension.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) + ); + + Optional.ofNullable(attributes + .get(ContextAttributes.datacontenttype.name())) + .ifPresent((dataContentType) -> { + result.put(MessageHeaders.CONTENT_TYPE, dataContentType); + }); + + return result; + } + + private HeaderMapper() { + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java new file mode 100644 index 00000000000..9712d4fa56a --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.messaging.MessageHeaders; + +import io.cloudevents.extensions.ExtensionFormat; +import io.cloudevents.format.BinaryMarshaller; +import io.cloudevents.format.StructuredMarshaller; +import io.cloudevents.format.Wire; +import io.cloudevents.format.builder.EventStep; +import io.cloudevents.json.Json; +import io.cloudevents.v1.Accessor; +import io.cloudevents.v1.AttributesImpl; + +/** + * A Cloud Events general purpose marshallers factory. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public final class Marshallers { + + private static final Map NO_HEADERS = new HashMap<>(); + + /** + * Builds a Binary Content Mode marshaller to marshal cloud events as JSON for + * any Transport Binding. + * @param The data type + * @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON + * @see BinaryMarshaller + */ + public static EventStep binary() { + return BinaryMarshaller.builder() + .map(AttributesImpl::marshal) + .map(Accessor::extensionsOf) + .map(ExtensionFormat::marshal) + .map(HeaderMapper::map) + .map(Json::binaryMarshal) + .builder(Wire::new); + } + + /** + * Builds a Structured Content Mode marshaller to marshal cloud event as JSON for + * any Transport Binding. + * @param The data type + * @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON + * @see StructuredMarshaller + */ + public static EventStep structured() { + return StructuredMarshaller. + builder() + .mime(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json") + .map((event) -> Json.binaryMarshal(event, NO_HEADERS)) + .skip(); + } + + private Marshallers() { + + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java new file mode 100644 index 00000000000..49c0e25db2f --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java @@ -0,0 +1,4 @@ +/** + * Provides classes supporting for Cloud Events. + */ +package org.springframework.integration.support.cloudevents; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java new file mode 100644 index 00000000000..a0d8e226b74 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java @@ -0,0 +1,203 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.transformer; + +import java.net.URI; +import java.time.ZonedDateTime; +import java.util.UUID; + +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.Expression; +import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.expression.ExpressionUtils; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.support.cloudevents.Marshallers; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; + +import io.cloudevents.CloudEvent; +import io.cloudevents.extensions.ExtensionFormat; +import io.cloudevents.format.Wire; +import io.cloudevents.format.builder.EventStep; +import io.cloudevents.v1.AttributesImpl; +import io.cloudevents.v1.CloudEventBuilder; +import io.cloudevents.v1.CloudEventImpl; + +/** + * An {@link AbstractTransformer} implementation to build a cloud event + * from the request message. + *

+ * This transformer may produce a message according a {@link ToCloudEventTransformer.Result} option. + * By default it is a {@link ToCloudEventTransformer.Result#RAW} + * with the meaning to produce a {@link io.cloudevents.CloudEvent} + * instance as a reply message payload. + *

+ * A {@link ToCloudEventTransformer.Result#BINARY} mode produces a marshalled into a {@code byte[]} + * a built {@link io.cloudevents.CloudEvent} body and respective cloud event headers. + *

+ * A {@link ToCloudEventTransformer.Result#STRUCTURED} mode produces a marshalled into a {@code byte[]} + * a whole {@link io.cloudevents.CloudEvent} and respective content type header + * with the {@code "application/cloudevents+json"} value. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class ToCloudEventTransformer extends AbstractTransformer { + + public enum Result { + + RAW, BINARY, STRUCTURED + + } + + private final URI source; + + @Nullable + private final EventStep wireBuilder; + + private Expression typeExpression = + new FunctionExpression>((message) -> message.getPayload().getClass().getName()); + + @Nullable + private Expression subjectExpression; + + @Nullable + private Expression dataSchemaExpression; + + @Nullable + private Expression extensionExpression; + + private EvaluationContext evaluationContext; + + public ToCloudEventTransformer(URI source) { + this(source, Result.RAW); + } + + public ToCloudEventTransformer(URI source, Result resultMode) { + Assert.notNull(source, "'source' must not be null"); + Assert.notNull(resultMode, "'resultMode' must not be null"); + this.source = source; + switch (resultMode) { + case BINARY: + this.wireBuilder = Marshallers.binary(); + break; + case STRUCTURED: + this.wireBuilder = Marshallers.structured(); + break; + default: + this.wireBuilder = null; + } + } + + public void setTypeExpression(Expression typeExpression) { + Assert.notNull(source, "'typeExpression' must not be null"); + this.typeExpression = typeExpression; + } + + public void setSubjectExpression(@Nullable Expression subjectExpression) { + this.subjectExpression = subjectExpression; + } + + public void setDataSchemaExpression(@Nullable Expression dataSchemaExpression) { + this.dataSchemaExpression = dataSchemaExpression; + } + + public void setExtensionExpression(@Nullable Expression extensionExpression) { + this.extensionExpression = extensionExpression; + } + + @Override + protected void onInit() { + super.onInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + + @Override + protected Object doTransform(Message message) { + CloudEventImpl cloudEvent = buildCloudEvent(message); + + if (this.wireBuilder != null) { + Wire wire = + this.wireBuilder.withEvent(() -> cloudEvent) + .marshal(); + + return getMessageBuilderFactory() + .withPayload(wire.getPayload().orElse(new byte[0])) + .copyHeaders(wire.getHeaders()) + .copyHeadersIfAbsent(message.getHeaders()) + .build(); + } + else { + return cloudEvent; + } + } + + @SuppressWarnings("unchecked") + private CloudEventImpl buildCloudEvent(Message message) { + MessageHeaders headers = message.getHeaders(); + Object payload = message.getPayload(); + + CloudEventBuilder cloudEventBuilder = + payload instanceof CloudEvent + ? CloudEventBuilder.builder((CloudEvent) payload) + : CloudEventBuilder.builder(); + + cloudEventBuilder.withId(headers.getId() != null + ? headers.getId().toString() + : UUID.randomUUID().toString()) + .withTime(ZonedDateTime.now()) + .withSource(this.source) + .withType(this.typeExpression.getValue(this.evaluationContext, message, String.class)); + + if (!(payload instanceof CloudEvent)) { + if (payload instanceof byte[]) { + cloudEventBuilder.withDataBase64((byte[]) payload); + } + else { + cloudEventBuilder.withData(payload); + } + } + + MimeType contentType = StaticMessageHeaderAccessor.getContentType(message); + + if (contentType != null) { + cloudEventBuilder.withDataContentType(contentType.toString()); + } + + if (this.subjectExpression != null) { + cloudEventBuilder.withSubject( + this.subjectExpression.getValue(this.evaluationContext, message, String.class)); + } + + if (this.dataSchemaExpression != null) { + cloudEventBuilder.withDataschema( + this.dataSchemaExpression.getValue(this.evaluationContext, message, URI.class)); + } + + if (this.extensionExpression != null) { + cloudEventBuilder.withExtension( + this.extensionExpression.getValue(this.evaluationContext, message, ExtensionFormat.class)); + } + + return cloudEventBuilder.build(); + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java new file mode 100644 index 00000000000..9df4a72c154 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java @@ -0,0 +1,114 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.transformer; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; + +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.json.JsonPathUtils; +import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.GenericMessage; + +import io.cloudevents.CloudEvent; + +/** + * @author Artem Bilan + * + * @since 5.3 + */ +public class ToCloudEventTransformerTests { + + private static final ConfigurableApplicationContext APPLICATION_CONTEXT = TestUtils.createTestApplicationContext(); + + private static final URI SOURCE = URI.create("https://spring.io/projects/spring-integration"); + + @AfterAll + static void teardown() { + APPLICATION_CONTEXT.close(); + } + + @Test + void testDefaultTransformer() { + ToCloudEventTransformer transformer = new ToCloudEventTransformer(SOURCE); + Message result = transformer.transform(new GenericMessage<>("test")); + assertThat(result.getHeaders()).containsOnlyKeys(MessageHeaders.ID, MessageHeaders.TIMESTAMP); + assertThat(result.getPayload()) + .asInstanceOf(InstanceOfAssertFactories.type(CloudEvent.class)) + .satisfies(event -> { + assertThat(event.getData().get()).isEqualTo("test"); + assertThat(event.getAttributes().getSource()).isEqualTo(SOURCE); + assertThat(event.getAttributes().getSpecversion()).isEqualTo("1.0"); + assertThat(event.getAttributes().getType()).isEqualTo(String.class.getName()); + assertThat(event.getAttributes().getMediaType().isPresent()).isFalse(); + } + ); + } + + @Test + void testBinary() { + ToCloudEventTransformer transformer = + new ToCloudEventTransformer(SOURCE, ToCloudEventTransformer.Result.BINARY); + transformer.setSubjectExpression(new LiteralExpression("some_subject")); + GenericMessage message = new GenericMessage<>("test"); + Message result = transformer.transform(message); + assertThat(result.getHeaders()) + .containsEntry("ce_type", String.class.getName()) + .containsEntry("ce_source", SOURCE.toString()) + .containsEntry("ce_id", message.getHeaders().getId().toString()) + .containsEntry("ce_subject", "some_subject") + .containsKeys("ce_time", "ce_specversion") + .doesNotContainKeys("ce_datacontenttype", MessageHeaders.CONTENT_TYPE, "ce_content_type"); + assertThat(result.getPayload()) + .isInstanceOf(byte[].class) + .isEqualTo("\"test\"".getBytes()); + } + + @Test + void testStructured() throws IOException { + ToCloudEventTransformer transformer = + new ToCloudEventTransformer(SOURCE, ToCloudEventTransformer.Result.STRUCTURED); + GenericMessage message = new GenericMessage<>("test"); + Message result = transformer.transform(message); + assertThat(result.getHeaders()) + .containsEntry(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json") + .doesNotContainKeys("ce_id", "ce_source", "ce_datacontenttype", "ce_time", "ce_specversion"); + Object payload = result.getPayload(); + assertThat(payload).isInstanceOf(byte[].class); + + List jsonPath = JsonPathUtils.evaluate(payload, "$..data"); + assertThat(jsonPath.get(0)).isEqualTo("test"); + + jsonPath = JsonPathUtils.evaluate(payload, "$..source"); + assertThat(jsonPath.get(0)).isEqualTo(SOURCE.toString()); + + jsonPath = JsonPathUtils.evaluate(payload, "$..type"); + assertThat(jsonPath.get(0)).isEqualTo(String.class.getName()); + + } + +} From e79579adb7ab88ac8351831d748e10e20c559079 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 10 Apr 2020 14:37:39 -0400 Subject: [PATCH 2/4] * Introduce `CloudEventHeaders` with well-known constant for Cloud Event headers in the message * Introduce a `ContentTypeDelegatingDataMarshaller` based on the `org.springframework.core.codec.Encoder` abstraction to delegate * Use `ContentTypeDelegatingDataMarshaller` from the `ToCloudEventTransformer` * Modify `ToCloudEventTransformerTests` to use constants from the `CloudEventHeaders` and verify that `text/plain` marshalling works well for cloud events --- .../cloudevents/CloudEventHeaders.java | 79 +++++++++++++++ .../ContentTypeDelegatingDataMarshaller.java | 98 +++++++++++++++++++ .../support/cloudevents/HeaderMapper.java | 9 +- .../support/cloudevents/Marshallers.java | 18 +++- .../transformer/ToCloudEventTransformer.java | 23 ++++- .../ToCloudEventTransformerTests.java | 33 +++++-- 6 files changed, 239 insertions(+), 21 deletions(-) create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/CloudEventHeaders.java create mode 100644 spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/CloudEventHeaders.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/CloudEventHeaders.java new file mode 100644 index 00000000000..3add90b93ff --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/CloudEventHeaders.java @@ -0,0 +1,79 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +/** + * Message headers for basic cloud event attributes. + * These headers might be remapped to respective attributes/headers + * in the target protocol binder. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public final class CloudEventHeaders { + + private CloudEventHeaders() { + } + + /** + * Header prefix as a {@value PREFIX} for cloud event attributes. + */ + public static final String PREFIX = "ce_"; + + /** + * The header name for cloud event {@code id} attribute. + */ + public static final String ID = PREFIX + "id"; + + /** + * The header name for cloud event {@code source} attribute. + */ + public static final String SOURCE = PREFIX + "source"; + + /** + * The header name for cloud event {@code specversion} attribute. + */ + public static final String SPEC_VERSION = PREFIX + "specversion"; + + /** + * The header name for cloud event {@code type} attribute. + */ + public static final String TYPE = PREFIX + "type"; + + /** + * The header name for cloud event {@code datacontenttype} attribute. + */ + public static final String DATA_CONTENT_TYPE = PREFIX + "datacontenttype"; + + /** + * The header name for cloud event {@code dataschema} attribute. + */ + public static final String DATA_SCHEMA = PREFIX + "dataschema"; + + /** + * The header name for cloud event {@code subject} attribute. + */ + public static final String SUBJECT = PREFIX + "subject"; + + /** + * The header name for cloud event {@code time} attribute. + */ + public static final String TIME = PREFIX + "time"; + + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java new file mode 100644 index 00000000000..5f6cbea33b5 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.cloudevents; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.CharSequenceEncoder; +import org.springframework.core.codec.Encoder; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.messaging.MessageHeaders; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; + +import io.cloudevents.fun.DataMarshaller; +import io.cloudevents.json.Json; + +/** + * A {@link DataMarshaller} implementation for delegating + * to the provided {@link Encoder}s according a {@link MessageHeaders#CONTENT_TYPE} + * header value. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class ContentTypeDelegatingDataMarshaller implements DataMarshaller { + + private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); + + private final List> encoders = new ArrayList<>(); + + @SafeVarargs + public ContentTypeDelegatingDataMarshaller(Encoder... encoders) { + this.encoders.add(CharSequenceEncoder.allMimeTypes()); + setEncoders(encoders); + } + + @SafeVarargs + public final void setEncoders(Encoder... encoders) { + Assert.notNull(encoders, "'encoders' must not be null"); + Assert.noNullElements(encoders, "'encoders' must not contain null elements"); + this.encoders.addAll(Arrays.asList(encoders)); + } + + @Override + @SuppressWarnings({ "unchecked", "rawtypes" }) + public byte[] marshal(T data, Map headers) throws RuntimeException { + String contentType = headers.get(MessageHeaders.CONTENT_TYPE); + if (contentType == null) { // Assume JSON by default + return Json.binaryMarshal(data, headers); + } + else { + ResolvableType elementType = ResolvableType.forClass(data.getClass()); + MimeType mimeType = MimeType.valueOf(contentType); + Encoder encoder = encoder(elementType, mimeType); + DataBuffer dataBuffer = + encoder.encodeValue(data, this.dataBufferFactory, elementType, + mimeType, (Map) (Map) headers); + + ByteBuffer buf = dataBuffer.asByteBuffer(); + byte[] result = new byte[buf.remaining()]; + buf.get(result); + return result; + } + } + + @SuppressWarnings("unchecked") + private Encoder encoder(ResolvableType elementType, MimeType mimeType) { + for (Encoder encoder : this.encoders) { + if (encoder.canEncode(elementType, mimeType)) { + return (Encoder) encoder; + } + } + throw new IllegalArgumentException("No encoder for " + elementType); + } + +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java index 155425c4c9a..a0341aa0e81 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java @@ -34,12 +34,7 @@ * * @since 5.3 */ -public class HeaderMapper { - - /** - * Cloud event headers prefix as a {@value HEADER_PREFIX}. - */ - public static final String HEADER_PREFIX = "ce_"; +public final class HeaderMapper { /** * Following the signature of {@link io.cloudevents.fun.FormatHeaderMapper} @@ -59,7 +54,7 @@ public static Map map(Map attributes, Map new AbstractMap.SimpleEntry<>( - HEADER_PREFIX + header.getKey().toLowerCase(Locale.US), + CloudEventHeaders.PREFIX + header.getKey().toLowerCase(Locale.US), header.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java index 9712d4fa56a..2600c0607ea 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/Marshallers.java @@ -26,6 +26,7 @@ import io.cloudevents.format.StructuredMarshaller; import io.cloudevents.format.Wire; import io.cloudevents.format.builder.EventStep; +import io.cloudevents.fun.DataMarshaller; import io.cloudevents.json.Json; import io.cloudevents.v1.Accessor; import io.cloudevents.v1.AttributesImpl; @@ -49,12 +50,27 @@ public final class Marshallers { * @see BinaryMarshaller */ public static EventStep binary() { + return binary(Json::binaryMarshal); + } + + /** + * Builds a Binary Content Mode marshaller to marshal cloud events as a {@code byte[]} for + * any Transport Binding. + * The data marshalling is based on the provided {@link DataMarshaller}. + * @param marshaller the {@link DataMarshaller} for cloud event payload. + * @param The data type + * @return a builder to provide the {@link io.cloudevents.CloudEvent} and marshal as JSON + * @see BinaryMarshaller + */ + public static EventStep binary( + DataMarshaller marshaller) { + return BinaryMarshaller.builder() .map(AttributesImpl::marshal) .map(Accessor::extensionsOf) .map(ExtensionFormat::marshal) .map(HeaderMapper::map) - .map(Json::binaryMarshal) + .map(marshaller) .builder(Wire::new); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java index a0d8e226b74..36f4b971147 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java @@ -20,11 +20,13 @@ import java.time.ZonedDateTime; import java.util.UUID; +import org.springframework.core.codec.Encoder; import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.expression.FunctionExpression; +import org.springframework.integration.support.cloudevents.ContentTypeDelegatingDataMarshaller; import org.springframework.integration.support.cloudevents.Marshallers; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -70,6 +72,9 @@ public enum Result { private final URI source; + private final ContentTypeDelegatingDataMarshaller dataMarshaller = + new ContentTypeDelegatingDataMarshaller<>(); + @Nullable private final EventStep wireBuilder; @@ -97,7 +102,7 @@ public ToCloudEventTransformer(URI source, Result resultMode) { this.source = source; switch (resultMode) { case BINARY: - this.wireBuilder = Marshallers.binary(); + this.wireBuilder = Marshallers.binary(this.dataMarshaller); break; case STRUCTURED: this.wireBuilder = Marshallers.structured(); @@ -108,7 +113,7 @@ public ToCloudEventTransformer(URI source, Result resultMode) { } public void setTypeExpression(Expression typeExpression) { - Assert.notNull(source, "'typeExpression' must not be null"); + Assert.notNull(typeExpression, "'typeExpression' must not be null"); this.typeExpression = typeExpression; } @@ -124,6 +129,18 @@ public void setExtensionExpression(@Nullable Expression extensionExpression) { this.extensionExpression = extensionExpression; } + /** + * Configure a set of {@link Encoder}s for content type based data marshalling. + * They are used only for the the {@link Result#BINARY} mode and when inbound payload + * is not a {@code byte[]} already. + * Plus {@link MessageHeaders#CONTENT_TYPE} must be present in the request message. + * @param encoders the {@link Encoder}s to use. + */ + @SafeVarargs + public final void setEncoders(Encoder... encoders) { + this.dataMarshaller.setEncoders(encoders); + } + @Override protected void onInit() { super.onInit(); @@ -140,7 +157,7 @@ protected Object doTransform(Message message) { .marshal(); return getMessageBuilderFactory() - .withPayload(wire.getPayload().orElse(new byte[0])) + .withPayload(wire.getPayload().orElse(cloudEvent.getDataBase64())) .copyHeaders(wire.getHeaders()) .copyHeadersIfAbsent(message.getHeaders()) .build(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java index 9df4a72c154..26758d3eae5 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ToCloudEventTransformerTests.java @@ -29,10 +29,13 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.json.JsonPathUtils; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.cloudevents.CloudEventHeaders; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.GenericMessage; +import org.springframework.util.MimeTypeUtils; import io.cloudevents.CloudEvent; @@ -74,18 +77,24 @@ void testBinary() { ToCloudEventTransformer transformer = new ToCloudEventTransformer(SOURCE, ToCloudEventTransformer.Result.BINARY); transformer.setSubjectExpression(new LiteralExpression("some_subject")); - GenericMessage message = new GenericMessage<>("test"); + Message message = + MessageBuilder.withPayload("test") + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) + .build(); Message result = transformer.transform(message); assertThat(result.getHeaders()) - .containsEntry("ce_type", String.class.getName()) - .containsEntry("ce_source", SOURCE.toString()) - .containsEntry("ce_id", message.getHeaders().getId().toString()) - .containsEntry("ce_subject", "some_subject") - .containsKeys("ce_time", "ce_specversion") - .doesNotContainKeys("ce_datacontenttype", MessageHeaders.CONTENT_TYPE, "ce_content_type"); + .containsEntry(CloudEventHeaders.TYPE, String.class.getName()) + .containsEntry(CloudEventHeaders.SOURCE, SOURCE.toString()) + .containsEntry(CloudEventHeaders.ID, message.getHeaders().getId().toString()) + .containsEntry(CloudEventHeaders.SUBJECT, "some_subject") + .containsEntry(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE) + .containsKeys(CloudEventHeaders.TIME, CloudEventHeaders.SPEC_VERSION) + .doesNotContainKeys( + CloudEventHeaders.DATA_CONTENT_TYPE, + "ce_content_type"); assertThat(result.getPayload()) .isInstanceOf(byte[].class) - .isEqualTo("\"test\"".getBytes()); + .isEqualTo("test".getBytes()); } @Test @@ -96,7 +105,12 @@ void testStructured() throws IOException { Message result = transformer.transform(message); assertThat(result.getHeaders()) .containsEntry(MessageHeaders.CONTENT_TYPE, "application/cloudevents+json") - .doesNotContainKeys("ce_id", "ce_source", "ce_datacontenttype", "ce_time", "ce_specversion"); + .doesNotContainKeys( + CloudEventHeaders.ID, + CloudEventHeaders.SOURCE, + CloudEventHeaders.DATA_CONTENT_TYPE, + CloudEventHeaders.TIME, + CloudEventHeaders.SPEC_VERSION); Object payload = result.getPayload(); assertThat(payload).isInstanceOf(byte[].class); @@ -108,7 +122,6 @@ void testStructured() throws IOException { jsonPath = JsonPathUtils.evaluate(payload, "$..type"); assertThat(jsonPath.get(0)).isEqualTo(String.class.getName()); - } } From 643a05653f73f6676e2d98b237cdcffaf6c753b0 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 10 Apr 2020 16:22:28 -0400 Subject: [PATCH 3/4] * Remove generic arg from the `ContentTypeDelegatingDataMarshaller` --- .../ContentTypeDelegatingDataMarshaller.java | 16 +++++++--------- .../transformer/ToCloudEventTransformer.java | 6 ++---- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java index 5f6cbea33b5..75d9dac01a1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/ContentTypeDelegatingDataMarshaller.java @@ -44,20 +44,18 @@ * * @since 5.3 */ -public class ContentTypeDelegatingDataMarshaller implements DataMarshaller { +public class ContentTypeDelegatingDataMarshaller implements DataMarshaller { private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); private final List> encoders = new ArrayList<>(); - @SafeVarargs - public ContentTypeDelegatingDataMarshaller(Encoder... encoders) { + public ContentTypeDelegatingDataMarshaller(Encoder... encoders) { this.encoders.add(CharSequenceEncoder.allMimeTypes()); setEncoders(encoders); } - @SafeVarargs - public final void setEncoders(Encoder... encoders) { + public final void setEncoders(Encoder... encoders) { Assert.notNull(encoders, "'encoders' must not be null"); Assert.noNullElements(encoders, "'encoders' must not contain null elements"); this.encoders.addAll(Arrays.asList(encoders)); @@ -65,7 +63,7 @@ public final void setEncoders(Encoder... encoders) { @Override @SuppressWarnings({ "unchecked", "rawtypes" }) - public byte[] marshal(T data, Map headers) throws RuntimeException { + public byte[] marshal(Object data, Map headers) throws RuntimeException { String contentType = headers.get(MessageHeaders.CONTENT_TYPE); if (contentType == null) { // Assume JSON by default return Json.binaryMarshal(data, headers); @@ -73,7 +71,7 @@ public byte[] marshal(T data, Map headers) throws RuntimeExcepti else { ResolvableType elementType = ResolvableType.forClass(data.getClass()); MimeType mimeType = MimeType.valueOf(contentType); - Encoder encoder = encoder(elementType, mimeType); + Encoder encoder = encoder(elementType, mimeType); DataBuffer dataBuffer = encoder.encodeValue(data, this.dataBufferFactory, elementType, mimeType, (Map) (Map) headers); @@ -86,10 +84,10 @@ public byte[] marshal(T data, Map headers) throws RuntimeExcepti } @SuppressWarnings("unchecked") - private Encoder encoder(ResolvableType elementType, MimeType mimeType) { + private Encoder encoder(ResolvableType elementType, MimeType mimeType) { for (Encoder encoder : this.encoders) { if (encoder.canEncode(elementType, mimeType)) { - return (Encoder) encoder; + return (Encoder) encoder; } } throw new IllegalArgumentException("No encoder for " + elementType); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java index 36f4b971147..82f9c4e233b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/transformer/ToCloudEventTransformer.java @@ -72,8 +72,7 @@ public enum Result { private final URI source; - private final ContentTypeDelegatingDataMarshaller dataMarshaller = - new ContentTypeDelegatingDataMarshaller<>(); + private final ContentTypeDelegatingDataMarshaller dataMarshaller = new ContentTypeDelegatingDataMarshaller(); @Nullable private final EventStep wireBuilder; @@ -136,8 +135,7 @@ public void setExtensionExpression(@Nullable Expression extensionExpression) { * Plus {@link MessageHeaders#CONTENT_TYPE} must be present in the request message. * @param encoders the {@link Encoder}s to use. */ - @SafeVarargs - public final void setEncoders(Encoder... encoders) { + public final void setEncoders(Encoder... encoders) { this.dataMarshaller.setEncoders(encoders); } From c3fadf79945ad7d6d59bce549412058f6ea20ac4 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 16 Apr 2020 10:57:35 -0400 Subject: [PATCH 4/4] * Fix typos in error messages & JavaDocs --- .../integration/support/cloudevents/HeaderMapper.java | 4 ++-- .../integration/support/cloudevents/package-info.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java index a0341aa0e81..84f8d6386d4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/HeaderMapper.java @@ -43,8 +43,8 @@ public final class HeaderMapper { * @return The map of headers */ public static Map map(Map attributes, Map extensions) { - Assert.notNull(attributes, "'attributes' must noy be null"); - Assert.notNull(extensions, "'extensions' must noy be null"); + Assert.notNull(attributes, "'attributes' must not be null"); + Assert.notNull(extensions, "'extensions' must not be null"); Map result = attributes.entrySet() diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java index 49c0e25db2f..83a06a78fcd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/cloudevents/package-info.java @@ -1,4 +1,4 @@ /** - * Provides classes supporting for Cloud Events. + * Provides classes to support for Cloud Events. */ package org.springframework.integration.support.cloudevents;