Skip to content

Add support for AUTO_PRODUCE schema #572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
:spring-cloud-stream-docs: https://docs.spring.io/spring-cloud-stream/docs/{spring-cloud-stream-version}/reference/html/
:spring-cloud-function: https://spring.io/projects/spring-cloud-function

:apache-pulsar-docs: https://pulsar.apache.org/docs/3.1.x
:apache-pulsar-docs: https://pulsar.apache.org/docs/3.2.x
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[UNRELATED] Bump in docs to current version

:apache-pulsar-cient-docs: {apache-pulsar-docs}/client-libraries-java
:apache-pulsar-io-docs: {apache-pulsar-docs}/io-connectors
:apache-pulsar-function-docs: {apache-pulsar-docs}/functions-overview
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[source,java,subs="attributes,verbatim"]
----
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
----
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[source,java,subs="attributes,verbatim"]
----
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
----
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@

include::../../attributes/attributes-variables.adoc[]

== Specifying Schema Information
If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data.
For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the `{template-class}`, the Spring for Apache Pulsar framework will try to build a `Schema.JSON` from the type.

IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding.
IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding.

=== Custom Schema Mapping
As an alternative to specifying the schema when invoking send operations on the `{template-class}` for complex types, the schema resolver can be configured with mappings for the types.
Expand All @@ -11,3 +14,14 @@ This removes the need to specify the schema as the framework consults the resolv
include::custom-schema-mapping.adoc[]

With this configuration in place, there is no need to set specify the schema on send operations.

=== Producing with AUTO_SCHEMA
If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] schema to publish a raw JSON or Avro payload as a `byte[]` safely.

In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic.

Simply specify a schema of `Schema.AUTO_PRODUCE_BYTES()` on your template send operations as shown in the example below:

include::{template-class}/template-snippet.adoc[]

NOTE: This is only supported with Avro and JSON schema types.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -47,9 +48,12 @@
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
import org.springframework.pulsar.test.support.model.UserRecord;
import org.springframework.util.function.ThrowingConsumer;

import com.fasterxml.jackson.databind.ObjectMapper;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

/**
* Tests for {@link ReactivePulsarTemplate}.
Expand Down Expand Up @@ -157,8 +161,8 @@ void sendMessageWithMessageCustomizer() throws Exception {
.withMessageCustomizer((mb) -> mb.key("test-key"))
.send()
.subscribe();
Message<String> msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING,
"test-message", true);
Message<?> msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING, "test-message",
true);
assertThat(msg.getKey()).isEqualTo("test-key");
}

Expand All @@ -168,15 +172,15 @@ void sendMessageWithSenderCustomizer() throws Exception {
.withSenderCustomizer((sb) -> sb.producerName("test-producer"))
.send()
.subscribe();
Message<String> msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING,
"test-message", true);
Message<?> msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING, "test-message",
true);
assertThat(msg.getProducerName()).isEqualTo("test-producer");
}

@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaultTopic) throws Exception {
String topic = "ptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic";
String topic = "rptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic";
ReactivePulsarSenderFactory<Foo> producerFactory = DefaultReactivePulsarSenderFactory.<Foo>builderFor(client)
.withDefaultTopic(producerFactoryHasDefaultTopic ? "fake-topic" : null)
.build();
Expand All @@ -200,25 +204,25 @@ void sendMessageWithoutTopicFails() {
.withMessage("Topic must be specified when no default topic is configured");
}

private <T> Message<T> sendAndConsume(Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic,
Schema<T> schema, @Nullable T expectedValue, Boolean withDefaultTopic) throws Exception {
private <T, V> Message<?> sendAndConsume(Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic,
Schema<V> schema, @Nullable V expectedValue, Boolean withDefaultTopic) throws Exception {
ReactivePulsarSenderFactory<T> senderFactory = DefaultReactivePulsarSenderFactory.<T>builderFor(client)
.withDefaultTopic(withDefaultTopic ? topic : null)
.build();
ReactivePulsarTemplate<T> pulsarTemplate = new ReactivePulsarTemplate<>(senderFactory);
return sendAndConsume(pulsarTemplate, sendFunction, topic, schema, expectedValue);
}

private <T> Message<T> sendAndConsume(ReactivePulsarTemplate<T> template,
Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic, Schema<T> schema, @Nullable T expectedValue)
private <T, V> Message<?> sendAndConsume(ReactivePulsarTemplate<T> template,
Consumer<ReactivePulsarTemplate<T>> sendFunction, String topic, Schema<V> schema, @Nullable V expectedValue)
throws Exception {
try (org.apache.pulsar.client.api.Consumer<T> consumer = client.newConsumer(schema)
try (org.apache.pulsar.client.api.Consumer<V> consumer = client.newConsumer(schema)
.topic(topic)
.subscriptionName(topic + "-sub")
.subscribe()) {
sendFunction.accept(template);

Message<T> msg = consumer.receive(3, TimeUnit.SECONDS);
Message<?> msg = consumer.receive(3, TimeUnit.SECONDS);
consumer.acknowledge(msg);
assertThat(msg).isNotNull();
assertThat(msg.getValue()).isEqualTo(expectedValue);
return msg;
Expand All @@ -230,7 +234,7 @@ class SendNonPrimitiveSchemaTests {

@Test
void withSpecifiedSchema() throws Exception {
String topic = "ptt-specificSchema-topic";
String topic = "rptt-specificSchema-topic";
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (
template) -> template.send(foo, Schema.AVRO(Foo.class)).subscribe();
Expand All @@ -239,15 +243,15 @@ void withSpecifiedSchema() throws Exception {

@Test
void withSchemaInferredByMessageType() throws Exception {
String topic = "ptt-nospecificSchema-topic";
String topic = "rptt-nospecificSchema-topic";
Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID());
ThrowingConsumer<ReactivePulsarTemplate<Foo>> sendFunction = (template) -> template.send(foo).subscribe();
sendAndConsume(sendFunction, topic, Schema.JSON(Foo.class), foo, true);
}

@Test
void withSchemaInferredByTypeMappings() throws Exception {
String topic = "ptt-schemaInferred-topic";
String topic = "rptt-schemaInferred-topic";
ReactivePulsarSenderFactory<Foo> producerFactory = DefaultReactivePulsarSenderFactory
.<Foo>builderFor(client)
.withDefaultTopic(topic)
Expand Down Expand Up @@ -293,6 +297,41 @@ void sendNullWithoutSchemaFails() {

}

@Nested
class SendAutoProduceSchemaTests {

@Test
void withJsonSchema() throws Exception {
var topic = "rptt-auto-json-topic";

// First send to the topic as JSON to establish the schema for the topic
var userJsonSchema = Schema.JSON(UserRecord.class);
var user = new UserRecord("Jason", 5150);
ThrowingConsumer<ReactivePulsarTemplate<UserRecord>> sendAsUserFunction = (
template) -> template.send(user, userJsonSchema).subscribe();
sendAndConsume(sendAsUserFunction, topic, userJsonSchema, user, true);

// Next send another user using byte[] with AUTO_PRODUCE - it should be
// consumed fine
var user2 = new UserRecord("Who", 6160);
var user2Bytes = new ObjectMapper().writeValueAsBytes(user2);
ThrowingConsumer<ReactivePulsarTemplate<byte[]>> sendAsBytesFunction = (
template) -> template.send(user2Bytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
sendAndConsume(sendAsBytesFunction, topic, userJsonSchema, user2, true);

// Finally send another user using byte[] with AUTO_PRODUCE w/ invalid payload
// - it should be rejected
var bytesSenderFactory = DefaultReactivePulsarSenderFactory.<byte[]>builderFor(client)
.withDefaultTopic(topic)
.build();
var bytesTemplate = new ReactivePulsarTemplate<>(bytesSenderFactory);

StepVerifier.create(bytesTemplate.send("invalid-payload".getBytes(), Schema.AUTO_PRODUCE_BYTES()))
.expectError(SchemaSerializationException.class);
}

}

public static class Foo {

private String foo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
Expand Down Expand Up @@ -168,6 +170,8 @@ private void closeProducer(Producer<T> producer, boolean async) {
*/
static class ProducerCacheKey<T> {

private static final SchemaHash AUTO_PRODUCE_SCHEMA_HASH = SchemaHash.of(new byte[0], SchemaType.AUTO_PUBLISH);

private final Schema<T> schema;

private final SchemaHash schemaHash;
Expand All @@ -193,7 +197,8 @@ static class ProducerCacheKey<T> {
Assert.notNull(schema, () -> "'schema' must be non-null");
Assert.notNull(topic, () -> "'topic' must be non-null");
this.schema = schema;
this.schemaHash = SchemaHash.of(this.schema);
this.schemaHash = (schema instanceof AutoProduceBytesSchema) ? AUTO_PRODUCE_SCHEMA_HASH
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary as any call to AutoProduceSchema.getSchemaInfo() will throw RuntimeException on the client until after something has been sent and validated w/ the target topic (i.e a schema is initialized on it). In our case, we don't care about schema info, the producer can be used for any byte[] messages that the user wants to be validated against the target topic schema and this cache key will do that.

: SchemaHash.of(this.schema);
this.topic = topic;
this.encryptionKeys = encryptionKeys;
this.customizers = customizers;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,6 +49,7 @@
import org.springframework.pulsar.cache.provider.CacheProvider;
import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerCacheKey;
import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerWithCloseCallback;
import org.springframework.pulsar.test.support.model.UserPojo;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ObjectUtils;

Expand All @@ -74,7 +75,7 @@ void cleanupFromTests() {
}

@Test
void createProducerMultipleCalls() throws PulsarClientException {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[UNRELATED] Leftover from removal of checked exception in Spring Pulsar APIs.

void createProducerMultipleCalls() {
var producerFactory = newProducerFactory();
var cacheKey = new ProducerCacheKey<>(schema, "topic1", null, null);
var producer1 = producerFactory.createProducer(schema, "topic1");
Expand Down Expand Up @@ -103,7 +104,7 @@ void cachedProducerIsCloseSafeWrapper() throws PulsarClientException {

@SuppressWarnings("resource")
@Test
void createProducerWithMatrixOfCacheKeys() throws PulsarClientException {
void createProducerWithMatrixOfCacheKeys() {
String topic1 = "topic1";
String topic2 = "topic2";
var schema1 = new StringSchema();
Expand Down Expand Up @@ -167,7 +168,7 @@ void createProducerWithMatrixOfCacheKeys() throws PulsarClientException {
}

@Test
void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientException {
void factoryDestroyCleansUpCacheAndClosesProducers() {
CachingPulsarProducerFactory<String> producerFactory = producerFactory(pulsarClient, null, null);
var actualProducer1 = actualProducer(producerFactory.createProducer(schema, "topic1"));
var actualProducer2 = actualProducer(producerFactory.createProducer(schema, "topic2"));
Expand All @@ -183,7 +184,7 @@ void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientExceptio
}

@Test
void producerEvictedFromCache() throws PulsarClientException {
void producerEvictedFromCache() {
CachingPulsarProducerFactory<String> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, null,
null, new DefaultTopicResolver(), Duration.ofSeconds(3L), 10L, 2);
var actualProducer = actualProducer(producerFactory.createProducer(schema, "topic1"));
Expand Down Expand Up @@ -306,7 +307,21 @@ static Stream<Arguments> equalsAndHashCodeTestProvider() {
arguments(
Named.of("differentNullInterceptor",
new ProducerCacheKey<>(Schema.STRING, "topic1", encryptionKeys1, customizers1)),
new ProducerCacheKey<>(Schema.STRING, "topic1", null, null), false));
new ProducerCacheKey<>(Schema.STRING, "topic1", null, null), false),
arguments(
Named.of("sameAutoProduceSchemaSameTopic",
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)),
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null), true),
arguments(
Named.of("autoProduceSchemaOneWithAndOneWithoutSchemaInfo",
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)),
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(UserPojo.class)), "topic1",
null, null),
true),
arguments(
Named.of("sameAutoProduceSchemaDifferentTopic",
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)),
new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic2", null, null), false));
}

}
Expand All @@ -315,7 +330,7 @@ static Stream<Arguments> equalsAndHashCodeTestProvider() {
class RestartFactoryTests {

@Test
void restartLifecycle() throws PulsarClientException {
void restartLifecycle() {
var producerFactory = (CachingPulsarProducerFactory<String>) producerFactory(pulsarClient, null, null);
producerFactory.start();
var producer1 = producerFactory.createProducer(schema, "topic1");
Expand Down
Loading