Skip to content

Add @PulsarMessage for default topic/schema #565

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
==== Configuration properties
Schema mappings can be configured with the `spring.pulsar.defaults.type-mappings` property.
The following example uses `application.yml` to add mappings for the `User` and `Address` complex objects using `AVRO` and `JSON` schemas, respectively:

Expand All @@ -17,6 +18,7 @@ spring:

NOTE: The `message-type` is the fully-qualified name of the message class.

==== Schema resolver customizer
The preferred method of adding mappings is via the property mentioned above.
However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).

Expand All @@ -32,3 +34,16 @@ public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer(
}
}
----

==== Type mapping annotation
Another option for specifying default schema information to use for a particular message type is to mark the message class with the `@PulsarMessage` annotation.
The schema info can be specified via the `schemaType` attribute on the annotation.

The following example configures the system to use JSON as the default schema when producing or consuming messages of type `Foo`:

[source,java,indent=0,subs="verbatim"]
----
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ NOTE: The `message-type` is the fully-qualified name of the message class.

WARNING: If the message (or the first message of a `Publisher` input) is `null`, the framework won't be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send `null` messages.

=== Specified via annotation

When no topic is passed into the API and there are no custom topic mappings configured, the system looks for a `@PulsarMessage` annotation on the class of the message being produced or consumed.
The default topic can be specified via the `topic` attribute on the annotation.

The following example configures the default topic to use when producing or consuming messages of type `Foo`:

[source,java,indent=0,subs="verbatim"]
----
@PulsarMessage(topic = "foo-topic")
record Foo(String value) {
}
----

=== Custom topic resolver
The preferred method of adding mappings is via the property mentioned above.
However, if more control is needed you can replace the default resolver by proving your own implementation, for example:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.apache.pulsar.common.schema.SchemaType;

/**
* Specifies default topic and schema info for a message class.
* <p>
* When a message class is marked with this annotation, the topic/schema resolution
* process will use the specified information to determine a topic/schema to use for the
* message in process.
*
* @author Aleksei Arsenev
* @author Chris Bono
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface PulsarMessage {

/**
* Default topic for the annotated message class.
* @return default topic for the annotated message class or empty string to indicate
* no default topic is specified
*/
String topic() default "";

/**
* Default schema type to use for the annotated message class.
* <p>
* Note that when this is set to {@code KEY_VALUE} you must specify the actual key and
* value information via the {@link #messageKeyType()} and
* {@link #messageValueSchemaType()} attributes, respectively.
* @return schema type to use for the annotated message class or {@code NONE} to
* indicate no default schema is specified
*/
SchemaType schemaType() default SchemaType.NONE;

/**
* The message key type when schema type is set to {@code KEY_VALUE}.
* <p>
* When the {@link #schemaType()} is not set to {@code KEY_VALUE} this attribute is
* ignored.
* @return message key type when using {@code KEY_VALUE} schema type
*/
Class<?> messageKeyType() default Void.class;

/**
* The default schema type to use for the value schema when {@link #schemaType()} is
* set to {@code KEY_VALUE}.
* <p>
* When the {@link #schemaType()} is not set to {@code KEY_VALUE} this attribute is
* ignored and the default schema type must be specified via the {@code schemaType}
* attribute.
* @return message value schema type when using {@code KEY_VALUE} schema type
*/
SchemaType messageValueSchemaType() default SchemaType.NONE;

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.springframework.core.ResolvableType;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.annotation.PulsarMessage;
import org.springframework.util.Assert;

/**
* Default schema resolver capable of handling basic message types.
Expand All @@ -52,6 +54,7 @@
* @author Soby Chacko
* @author Alexander Preuß
* @author Chris Bono
* @author Aleksei Arsenev
*/
public class DefaultSchemaResolver implements SchemaResolver {

Expand Down Expand Up @@ -89,6 +92,19 @@ public class DefaultSchemaResolver implements SchemaResolver {

private final Map<Class<?>, Schema<?>> customSchemaMappings = new LinkedHashMap<>();

private final PulsarMessageAnnotationRegistry pulsarMessageAnnotationRegistry = new PulsarMessageAnnotationRegistry();

private boolean usePulsarMessageAnnotations = true;

/**
* Sets whether to inspect message classes for the
* {@link PulsarMessage @PulsarMessage} annotation during schema resolution.
* @param usePulsarMessageAnnotations whether to inspect messages for the annotation
*/
public void usePulsarMessageAnnotations(boolean usePulsarMessageAnnotations) {
this.usePulsarMessageAnnotations = usePulsarMessageAnnotations;
}

/**
* Adds a custom mapping from message type to schema.
* @param messageType the message type
Expand Down Expand Up @@ -137,7 +153,18 @@ public <T> Resolved<Schema<T>> resolveSchema(@Nullable Class<?> messageClass, bo

@Nullable
protected Schema<?> getCustomSchemaOrMaybeDefault(@Nullable Class<?> messageClass, boolean returnDefault) {
// Check for custom schema mapping
Schema<?> schema = this.customSchemaMappings.get(messageClass);

// If no custom schema mapping found, look for @PulsarMessage (if enabled)
if (this.usePulsarMessageAnnotations && schema == null && messageClass != null) {
schema = getAnnotatedSchemaType(messageClass);
if (schema != null) {
this.addCustomSchemaMapping(messageClass, schema);
}
}

// If still no schema, possibly return a default
if (schema == null && returnDefault) {
if (messageClass != null) {
try {
Expand All @@ -152,6 +179,30 @@ protected Schema<?> getCustomSchemaOrMaybeDefault(@Nullable Class<?> messageClas
return schema;
}

// VisibleForTesting
Schema<?> getAnnotatedSchemaType(Class<?> messageClass) {
PulsarMessage annotation = this.pulsarMessageAnnotationRegistry.getAnnotationFor(messageClass).orElse(null);
if (annotation == null || annotation.schemaType() == SchemaType.NONE) {
return null;
}
var schemaType = annotation.schemaType();
if (schemaType != SchemaType.KEY_VALUE) {
return resolveSchema(annotation.schemaType(), messageClass, null).value().orElse(null);
}
// handle complicated key value
var messageKeyClass = annotation.messageKeyType();
Assert.state(messageKeyClass != Void.class,
"messageKeyClass can not be Void.class when using KEY_VALUE schema type");

var messageValueSchemaType = annotation.messageValueSchemaType();
Assert.state(messageValueSchemaType != SchemaType.NONE && messageValueSchemaType != SchemaType.KEY_VALUE,
() -> "messageValueSchemaType can not be NONE or KEY_VALUE when using KEY_VALUE schema type");

Schema<?> keySchema = this.resolveSchema(messageKeyClass).orElseThrow();
Schema<?> valueSchema = this.resolveSchema(messageValueSchemaType, messageClass, null).orElseThrow();
return Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
}

@Override
@SuppressWarnings("unchecked")
public <T> Resolved<Schema<T>> resolveSchema(SchemaType schemaType, @Nullable ResolvableType messageType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.Supplier;

import org.springframework.lang.Nullable;
import org.springframework.pulsar.annotation.PulsarMessage;
import org.springframework.util.StringUtils;

/**
Expand All @@ -32,11 +33,25 @@
* {@link #addCustomTopicMapping(Class, String)}.
*
* @author Chris Bono
* @author Aleksei Arsenev
*/
public class DefaultTopicResolver implements TopicResolver {

private final Map<Class<?>, String> customTopicMappings = new LinkedHashMap<>();

private final PulsarMessageAnnotationRegistry pulsarMessageAnnotationRegistry = new PulsarMessageAnnotationRegistry();

private boolean usePulsarMessageAnnotations = true;

/**
* Sets whether to inspect message classes for the
* {@link PulsarMessage @PulsarMessage} annotation during topic resolution.
* @param usePulsarMessageAnnotations whether to inspect messages for the annotation
*/
public void usePulsarMessageAnnotations(boolean usePulsarMessageAnnotations) {
this.usePulsarMessageAnnotations = usePulsarMessageAnnotations;
}

/**
* Adds a custom mapping from message type to topic.
* @param messageType the message type
Expand Down Expand Up @@ -100,9 +115,31 @@ protected Resolved<String> doResolveTopic(@Nullable String userSpecifiedTopic, @
if (messageType == null) {
return Resolved.failed("Topic must be specified when the message is null");
}
String topic = this.customTopicMappings.getOrDefault(messageType, defaultTopicSupplier.get());
// Check for custom topic mapping
String topic = this.customTopicMappings.get(messageType);

// If no custom topic mapping found, look for @PulsarMessage (if enabled)
if (this.usePulsarMessageAnnotations && topic == null) {
topic = getAnnotatedTopicInfo(messageType);
if (topic != null) {
this.addCustomTopicMapping(messageType, topic);
}
}

// If still no topic, consult the default topic supplier
if (topic == null) {
topic = defaultTopicSupplier.get();
}
return topic == null ? Resolved.failed("Topic must be specified when no default topic is configured")
: Resolved.of(topic);
}

// VisibleForTesting
String getAnnotatedTopicInfo(Class<?> messageType) {
return this.pulsarMessageAnnotationRegistry.getAnnotationFor(messageType)
.map(PulsarMessage::topic)
.filter(StringUtils::hasText)
.orElse(null);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.core;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.annotation.PulsarMessage;
import org.springframework.util.Assert;

/**
* A registry that holds the {@link PulsarMessage @PulsarMessage} annotations and each
* associated class that is marked with the annotation.
* <p>
* The annotations are looked up on-demand and the result is cached.
* <p>
* Once the cache reaches a {@link #maxNumberOfAnnotationsCached certain size} (default of
* {@link #DEFAULT_MAX_CACHE_SIZE}) it is cleared and the annotations will be looked up
* again the next time they are requested.
*
* @author Chris Bono
*/
class PulsarMessageAnnotationRegistry {

private static final int DEFAULT_MAX_CACHE_SIZE = 1000;

private final int maxNumberOfAnnotationsCached;

private final LogAccessor logger = new LogAccessor(this.getClass());

private ConcurrentHashMap<Class<?>, Optional<PulsarMessage>> annotationsByClass = new ConcurrentHashMap<>();

PulsarMessageAnnotationRegistry() {
this(DEFAULT_MAX_CACHE_SIZE);
}

PulsarMessageAnnotationRegistry(int maxNumberOfAnnotationsCached) {
Assert.state(maxNumberOfAnnotationsCached > 0, "maxNumberOfAnnotationsCached must be > 0");
this.maxNumberOfAnnotationsCached = maxNumberOfAnnotationsCached;
}

/**
* Gets the {@link PulsarMessage @PulsarMessage} on the specified class or empty if
* the class is not marked with the annotation.
* @param targetClass the class to check for the annotation
* @return an optional containing the annotation or empty if the class is not marked
* with the annotation.
*/
Optional<PulsarMessage> getAnnotationFor(Class<?> targetClass) {
var annotation = this.annotationsByClass.computeIfAbsent(targetClass, this::findAnnotationOn);
if (this.annotationsByClass.size() > this.maxNumberOfAnnotationsCached) {
this.logger
.info(() -> "Clearing cache - max entries exceeded (%d)".formatted(this.maxNumberOfAnnotationsCached));
this.annotationsByClass = new ConcurrentHashMap<>();
}
return annotation;
}

// VisibleForTesting
protected Optional<PulsarMessage> findAnnotationOn(Class<?> targetClass) {
this.logger.debug(() -> "Looking for @PulsarMessage on " + targetClass);
PulsarMessage annotation = AnnotationUtils.findAnnotation(targetClass, PulsarMessage.class);
return Optional.ofNullable(annotation);
}

}
Loading