Skip to content

Commit 1c31e6c

Browse files
committed
Polish "Add @PulsarTypeMapping for default topi.."
* Introduce a PulsarTypeMappingRegistry to cache annotations * Expand on the topic/schema resolver tests * Add support for KEY_VALUE on annotation
1 parent 9f8ece6 commit 1c31e6c

File tree

9 files changed

+461
-29
lines changed

9 files changed

+461
-29
lines changed

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/custom-schema-mapping.adoc

+17
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
==== Configuration properties
12
Schema mappings can be configured with the `spring.pulsar.defaults.type-mappings` property.
23
The following example uses `application.yml` to add mappings for the `User` and `Address` complex objects using `AVRO` and `JSON` schemas, respectively:
34

@@ -17,6 +18,7 @@ spring:
1718

1819
NOTE: The `message-type` is the fully-qualified name of the message class.
1920

21+
==== Schema resolver customizer
2022
The preferred method of adding mappings is via the property mentioned above.
2123
However, if more control is needed you can provide a schema resolver customizer to add the mapping(s).
2224

@@ -32,3 +34,18 @@ public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer(
3234
}
3335
}
3436
----
37+
38+
==== Type mapping annotation
39+
Another option for specifying default schema information to use for a particular message type is to mark the message class with the `@PulsarTypeMapping` annotation.
40+
The schema info can be specified via the `schemaType` attribute on the annotation.
41+
42+
The following example configures the system to use JSON as the default schema when producing or consuming messages of type `Foo`:
43+
44+
[source,java,indent=0,subs="verbatim"]
45+
----
46+
@PulsarTypeMapping(schemaType = SchemaType.JSON)
47+
record Foo(String value) {
48+
}
49+
----
50+
51+
NOTE: The annotations are looked up on-demand and their result is cached. However, there is still a small performance hit on the first lookup. If you want to disable this feature you can invoke the `usePulsarTypeMappingAnnotations(false)` method on the `DefaultSchemaResolver`.

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/topic-resolution.adoc

+9-3
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,21 @@ NOTE: The `message-type` is the fully-qualified name of the message class.
3535
WARNING: If the message (or the first message of a `Publisher` input) is `null`, the framework won't be able to determine the topic from it. Another method shall be used to specify the topic if your application is likely to send `null` messages.
3636

3737
=== Specified via annotation
38-
When no topic passed into API and no mappings configured, the system looks for `PulsarTopic` annotation. The following example configures topic for `Baz` class using annotation:
38+
39+
When no topic is passed into the API and there are no custom topic mappings configured, the system looks for a `@PulsarTypeMapping` annotation on the class of the message being produced or consumed.
40+
The default topic can be specified via the `topic` attribute on the annotation.
41+
42+
The following example configures the default topic to use when producing or consuming messages of type `Foo`:
3943

4044
[source,java,indent=0,subs="verbatim"]
4145
----
42-
@PulsarTopic("baz-topic")
43-
record Baz(String value) {
46+
@PulsarTypeMapping(topic = "foo-topic")
47+
record Foo(String value) {
4448
}
4549
----
4650

51+
NOTE: The annotations are looked up on-demand and their result is cached. However, there is still a small performance hit on the first lookup. If you want to disable this feature you can invoke the `usePulsarTypeMappingAnnotations(false)` method on the `DefaultTopicResolver`.
52+
4753
=== Custom topic resolver
4854
The preferred method of adding mappings is via the property mentioned above.
4955
However, if more control is needed you can replace the default resolver by proving your own implementation, for example:

spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarTypeMapping.java

+34-9
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,65 @@
1616

1717
package org.springframework.pulsar.annotation;
1818

19-
import org.apache.pulsar.common.schema.SchemaType;
20-
2119
import java.lang.annotation.Documented;
2220
import java.lang.annotation.ElementType;
2321
import java.lang.annotation.Retention;
2422
import java.lang.annotation.RetentionPolicy;
2523
import java.lang.annotation.Target;
2624

25+
import org.apache.pulsar.common.schema.SchemaType;
26+
2727
/**
28-
* Specifies default topic and schema for class.
28+
* Specifies default topic and schema info for a message class.
29+
* <p>
30+
* When a message class is marked with this annotation, the topic/schema resolution
31+
* process will use the specified information to determine a topic/schema to use for the
32+
* message in process.
2933
*
3034
* @author Aleksei Arsenev
35+
* @author Chris Bono
3136
*/
3237
@Target(ElementType.TYPE)
3338
@Retention(RetentionPolicy.RUNTIME)
3439
@Documented
3540
public @interface PulsarTypeMapping {
3641

3742
/**
38-
* Default topic for class.
39-
* @return topic
43+
* Default topic for the annotated message class.
44+
* @return default topic for the annotated message class or empty string to indicate
45+
* no default topic is specified
4046
*/
4147
String topic() default "";
4248

4349
/**
44-
* Default schema type for class.
45-
* @return schema type
50+
* Default schema type to use for the annotated message class.
51+
* <p>
52+
* Note that when this is set to {@code KEY_VALUE} you must specify the actual key and
53+
* value information via the {@link #messageKeyType()} and
54+
* {@link #messageValueSchemaType()} attributes, respectively.
55+
* @return schema type to use for the annotated message class or {@code NONE} to
56+
* indicate no default schema is specified
4657
*/
4758
SchemaType schemaType() default SchemaType.NONE;
4859

4960
/**
50-
* Message key type (must be specified when schema type is {@code KEY_VALUE})
51-
* @return message key type
61+
* The message key type when schema type is set to {@code KEY_VALUE}.
62+
* <p>
63+
* When the {@link #schemaType()} is not set to {@code KEY_VALUE} this attribute is
64+
* ignored.
65+
* @return message key type when using {@code KEY_VALUE} schema type
5266
*/
5367
Class<?> messageKeyType() default Void.class;
5468

69+
/**
70+
* The default schema type to use for the value schema when {@link #schemaType()} is
71+
* set to {@code KEY_VALUE}.
72+
* <p>
73+
* When the {@link #schemaType()} is not set to {@code KEY_VALUE} this attribute is
74+
* ignored and the default schema type must be specified via the {@code schemaType}
75+
* attribute.
76+
* @return message value schema type when using {@code KEY_VALUE} schema type
77+
*/
78+
SchemaType messageValueSchemaType() default SchemaType.NONE;
79+
5580
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java

+49-7
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@
3939
import org.apache.pulsar.common.schema.SchemaType;
4040

4141
import org.springframework.core.ResolvableType;
42-
import org.springframework.core.annotation.AnnotationUtils;
4342
import org.springframework.core.log.LogAccessor;
4443
import org.springframework.lang.Nullable;
4544
import org.springframework.pulsar.annotation.PulsarTypeMapping;
45+
import org.springframework.util.Assert;
4646

4747
/**
4848
* Default schema resolver capable of handling basic message types.
@@ -54,6 +54,7 @@
5454
* @author Soby Chacko
5555
* @author Alexander Preuß
5656
* @author Chris Bono
57+
* @author Aleksei Arsenev
5758
*/
5859
public class DefaultSchemaResolver implements SchemaResolver {
5960

@@ -91,6 +92,20 @@ public class DefaultSchemaResolver implements SchemaResolver {
9192

9293
private final Map<Class<?>, Schema<?>> customSchemaMappings = new LinkedHashMap<>();
9394

95+
private final PulsarTypeMappingRegistry pulsarTypeMappingRegistry = new PulsarTypeMappingRegistry();
96+
97+
private boolean usePulsarTypeMappingAnnotations = true;
98+
99+
/**
100+
* Sets whether to inspect message classes for the
101+
* {@link PulsarTypeMapping @PulsarTypeMapping} annotation during schema resolution.
102+
* @param usePulsarTypeMappingAnnotations whether to inspect messages for the
103+
* annotation
104+
*/
105+
public void usePulsarTypeMappingAnnotations(boolean usePulsarTypeMappingAnnotations) {
106+
this.usePulsarTypeMappingAnnotations = usePulsarTypeMappingAnnotations;
107+
}
108+
94109
/**
95110
* Adds a custom mapping from message type to schema.
96111
* @param messageType the message type
@@ -139,15 +154,18 @@ public <T> Resolved<Schema<T>> resolveSchema(@Nullable Class<?> messageClass, bo
139154

140155
@Nullable
141156
protected Schema<?> getCustomSchemaOrMaybeDefault(@Nullable Class<?> messageClass, boolean returnDefault) {
157+
// Check for custom schema mapping
142158
Schema<?> schema = this.customSchemaMappings.get(messageClass);
143-
if (schema == null && messageClass != null) {
144-
PulsarTypeMapping annotation = AnnotationUtils.findAnnotation(messageClass, PulsarTypeMapping.class);
145-
if (annotation != null && annotation.schemaType() != SchemaType.NONE) {
146-
var resolvedSchema = resolveSchema(annotation.schemaType(), messageClass, annotation.messageKeyType());
147-
resolvedSchema.ifResolved(objectSchema -> addCustomSchemaMapping(messageClass, objectSchema));
148-
schema = resolvedSchema.get().orElse(null);
159+
160+
// If no custom schema mapping found, look for @PulsarTypeMapping (if enabled)
161+
if (this.usePulsarTypeMappingAnnotations && schema == null && messageClass != null) {
162+
schema = getAnnotatedSchemaType(messageClass);
163+
if (schema != null) {
164+
this.addCustomSchemaMapping(messageClass, schema);
149165
}
150166
}
167+
168+
// If still no schema, possibly return a default
151169
if (schema == null && returnDefault) {
152170
if (messageClass != null) {
153171
try {
@@ -162,6 +180,30 @@ protected Schema<?> getCustomSchemaOrMaybeDefault(@Nullable Class<?> messageClas
162180
return schema;
163181
}
164182

183+
// VisibleForTesting
184+
Schema<?> getAnnotatedSchemaType(Class<?> messageClass) {
185+
PulsarTypeMapping annotation = this.pulsarTypeMappingRegistry.getTypeMappingFor(messageClass).orElse(null);
186+
if (annotation == null || annotation.schemaType() == SchemaType.NONE) {
187+
return null;
188+
}
189+
var schemaType = annotation.schemaType();
190+
if (schemaType != SchemaType.KEY_VALUE) {
191+
return resolveSchema(annotation.schemaType(), messageClass, null).value().orElse(null);
192+
}
193+
// handle complicated key value
194+
var messageKeyClass = annotation.messageKeyType();
195+
Assert.state(messageKeyClass != Void.class,
196+
"messageKeyClass can not be Void.class when using KEY_VALUE schema type");
197+
198+
var messageValueSchemaType = annotation.messageValueSchemaType();
199+
Assert.state(messageValueSchemaType != SchemaType.NONE && messageValueSchemaType != SchemaType.KEY_VALUE,
200+
() -> "messageValueSchemaType can not be NONE or KEY_VALUE when using KEY_VALUE schema type");
201+
202+
Schema<?> keySchema = this.resolveSchema(messageKeyClass).orElseThrow();
203+
Schema<?> valueSchema = this.resolveSchema(messageValueSchemaType, messageClass, null).orElseThrow();
204+
return Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
205+
}
206+
165207
@Override
166208
@SuppressWarnings("unchecked")
167209
public <T> Resolved<Schema<T>> resolveSchema(SchemaType schemaType, @Nullable ResolvableType messageType) {

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultTopicResolver.java

+33-9
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
package org.springframework.pulsar.core;
1818

1919
import java.util.Collections;
20+
import java.util.LinkedHashMap;
2021
import java.util.Map;
21-
import java.util.concurrent.ConcurrentHashMap;
2222
import java.util.function.Supplier;
2323

24-
import org.springframework.core.annotation.AnnotationUtils;
2524
import org.springframework.lang.Nullable;
2625
import org.springframework.pulsar.annotation.PulsarTypeMapping;
2726
import org.springframework.util.StringUtils;
@@ -34,10 +33,25 @@
3433
* {@link #addCustomTopicMapping(Class, String)}.
3534
*
3635
* @author Chris Bono
36+
* @author Aleksei Arsenev
3737
*/
3838
public class DefaultTopicResolver implements TopicResolver {
3939

40-
private final Map<Class<?>, String> customTopicMappings = new ConcurrentHashMap<>();
40+
private final Map<Class<?>, String> customTopicMappings = new LinkedHashMap<>();
41+
42+
private final PulsarTypeMappingRegistry pulsarTypeMappingRegistry = new PulsarTypeMappingRegistry();
43+
44+
private boolean usePulsarTypeMappingAnnotations = true;
45+
46+
/**
47+
* Sets whether to inspect message classes for the
48+
* {@link PulsarTypeMapping @PulsarTypeMapping} annotation during topic resolution.
49+
* @param usePulsarTypeMappingAnnotations whether to inspect messages for the
50+
* annotation
51+
*/
52+
public void usePulsarTypeMappingAnnotations(boolean usePulsarTypeMappingAnnotations) {
53+
this.usePulsarTypeMappingAnnotations = usePulsarTypeMappingAnnotations;
54+
}
4155

4256
/**
4357
* Adds a custom mapping from message type to topic.
@@ -102,21 +116,31 @@ protected Resolved<String> doResolveTopic(@Nullable String userSpecifiedTopic, @
102116
if (messageType == null) {
103117
return Resolved.failed("Topic must be specified when the message is null");
104118
}
119+
// Check for custom topic mapping
120+
String topic = this.customTopicMappings.get(messageType);
105121

106-
String topic = this.getCustomTopicMappings().get(messageType);
107-
if (topic == null) {
108-
PulsarTypeMapping annotation = AnnotationUtils.findAnnotation(messageType, PulsarTypeMapping.class);
109-
if (annotation != null && !annotation.topic().isBlank()) {
110-
this.addCustomTopicMapping(messageType, annotation.topic());
111-
topic = annotation.topic();
122+
// If no custom topic mapping found, look for @PulsarTypeMapping (if enabled)
123+
if (this.usePulsarTypeMappingAnnotations && topic == null) {
124+
topic = getAnnotatedTopicInfo(messageType);
125+
if (topic != null) {
126+
this.addCustomTopicMapping(messageType, topic);
112127
}
113128
}
114129

130+
// If still no topic, consult the default topic supplier
115131
if (topic == null) {
116132
topic = defaultTopicSupplier.get();
117133
}
118134
return topic == null ? Resolved.failed("Topic must be specified when no default topic is configured")
119135
: Resolved.of(topic);
120136
}
121137

138+
// VisibleForTesting
139+
String getAnnotatedTopicInfo(Class<?> messageType) {
140+
return this.pulsarTypeMappingRegistry.getTypeMappingFor(messageType)
141+
.map(PulsarTypeMapping::topic)
142+
.filter(StringUtils::hasText)
143+
.orElse(null);
144+
}
145+
122146
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2023-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.core;
18+
19+
import java.util.Optional;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
22+
import org.springframework.core.annotation.AnnotationUtils;
23+
import org.springframework.core.log.LogAccessor;
24+
import org.springframework.pulsar.annotation.PulsarTypeMapping;
25+
import org.springframework.util.Assert;
26+
27+
/**
28+
* A registry that holds the {@link PulsarTypeMapping @PulsarTypeMapping} annotations and
29+
* each associated class that is marked with the annotation.
30+
* <p>
31+
* The annotations are looked up on-demand and the result is cached.
32+
* <p>
33+
* Once the cache reaches a {@link #maxNumberOfMappingsCached certain size} (default of
34+
* {@link #DEFAULT_MAX_CACHE_SIZE}) it is cleared and the annotations will be looked up
35+
* again the next time they are requested.
36+
*
37+
* @author Chris Bono
38+
*/
39+
class PulsarTypeMappingRegistry {
40+
41+
private static final int DEFAULT_MAX_CACHE_SIZE = 1000;
42+
43+
private final int maxNumberOfMappingsCached;
44+
45+
private final LogAccessor logger = new LogAccessor(this.getClass());
46+
47+
private ConcurrentHashMap<Class<?>, Optional<PulsarTypeMapping>> typeMappingsByClass = new ConcurrentHashMap<>();
48+
49+
PulsarTypeMappingRegistry() {
50+
this(DEFAULT_MAX_CACHE_SIZE);
51+
}
52+
53+
PulsarTypeMappingRegistry(int maxNumberOfMappingsCached) {
54+
Assert.state(maxNumberOfMappingsCached > 0, "maxNumberOfMappingsCached must be > 0");
55+
this.maxNumberOfMappingsCached = maxNumberOfMappingsCached;
56+
}
57+
58+
/**
59+
* Gets the {@link PulsarTypeMapping @PulsarTypeMapping} on the specified class or
60+
* empty if the class is not marked with the annotation.
61+
* @param targetClass the class to check for the annotation
62+
* @return an optional containing the annotation or empty if the class is not marked
63+
* with the annotation.
64+
*/
65+
Optional<PulsarTypeMapping> getTypeMappingFor(Class<?> targetClass) {
66+
var optionalTypeMapping = this.typeMappingsByClass.computeIfAbsent(targetClass, this::findTypeMappingOn);
67+
if (this.typeMappingsByClass.size() > this.maxNumberOfMappingsCached) {
68+
this.logger
69+
.info(() -> "Clearing cache - max entries exceeded (%d)".formatted(this.maxNumberOfMappingsCached));
70+
this.typeMappingsByClass = new ConcurrentHashMap<>();
71+
}
72+
return optionalTypeMapping;
73+
}
74+
75+
// VisibleForTesting
76+
protected Optional<PulsarTypeMapping> findTypeMappingOn(Class<?> targetClass) {
77+
this.logger.debug(() -> "Looking for @PulsarTypeMapping on " + targetClass);
78+
PulsarTypeMapping annotation = AnnotationUtils.findAnnotation(targetClass, PulsarTypeMapping.class);
79+
return Optional.ofNullable(annotation);
80+
}
81+
82+
}

0 commit comments

Comments
 (0)