diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/attributes/attributes-variables.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/attributes/attributes-variables.adoc index cc918b8d3..285e163cb 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/attributes/attributes-variables.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/attributes/attributes-variables.adoc @@ -14,7 +14,7 @@ :spring-cloud-stream-docs: https://docs.spring.io/spring-cloud-stream/docs/{spring-cloud-stream-version}/reference/html/ :spring-cloud-function: https://spring.io/projects/spring-cloud-function -:apache-pulsar-docs: https://pulsar.apache.org/docs/3.1.x +:apache-pulsar-docs: https://pulsar.apache.org/docs/3.2.x :apache-pulsar-cient-docs: {apache-pulsar-docs}/client-libraries-java :apache-pulsar-io-docs: {apache-pulsar-docs}/io-connectors :apache-pulsar-function-docs: {apache-pulsar-docs}/functions-overview diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/PulsarTemplate/template-snippet.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/PulsarTemplate/template-snippet.adoc new file mode 100644 index 000000000..fb77cc517 --- /dev/null +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/PulsarTemplate/template-snippet.adoc @@ -0,0 +1,6 @@ +[source,java,subs="attributes,verbatim"] +---- +void sendUserAsBytes(PulsarTemplate template, byte[] userAsBytes) { + template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()); +} +---- diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/ReactivePulsarTemplate/template-snippet.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/ReactivePulsarTemplate/template-snippet.adoc new file mode 100644 index 000000000..7fe8f52bf --- /dev/null +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/ReactivePulsarTemplate/template-snippet.adoc @@ -0,0 +1,6 @@ +[source,java,subs="attributes,verbatim"] +---- +void sendUserAsBytes(ReactivePulsarTemplate template, byte[] userAsBytes) { + template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe(); +} +---- diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-template.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-template.adoc index c47d6d768..6b607736f 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-template.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/schema-info/schema-info-template.adoc @@ -1,8 +1,11 @@ + +include::../../attributes/attributes-variables.adoc[] + == Specifying Schema Information If you use Java primitive types, the framework auto-detects the schema for you, and you need not specify any schema types for publishing the data. For non-primitive types, if the Schema is not explicitly specified when invoking send operations on the `{template-class}`, the Spring for Apache Pulsar framework will try to build a `Schema.JSON` from the type. -IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, and KEY_VALUE w/ INLINE encoding. +IMPORTANT: Complex Schema types that are currently supported are JSON, AVRO, PROTOBUF, AUTO_PRODUCE_BYTES, and KEY_VALUE w/ INLINE encoding. === Custom Schema Mapping As an alternative to specifying the schema when invoking send operations on the `{template-class}` for complex types, the schema resolver can be configured with mappings for the types. @@ -11,3 +14,14 @@ This removes the need to specify the schema as the framework consults the resolv include::custom-schema-mapping.adoc[] With this configuration in place, there is no need to set specify the schema on send operations. + +=== Producing with AUTO_SCHEMA +If there is no chance to know the type of schema of a Pulsar topic in advance, you can use an {apache-pulsar-docs}/schema-get-started/#auto_produce[AUTO_PRODUCE] schema to publish a raw JSON or Avro payload as a `byte[]` safely. + +In this case, the producer validates whether the outbound bytes are compatible with the schema of the destination topic. + +Simply specify a schema of `Schema.AUTO_PRODUCE_BYTES()` on your template send operations as shown in the example below: + +include::{template-class}/template-snippet.adoc[] + +NOTE: This is only supported with Avro and JSON schema types. diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java index b85e1ac51..a518c1299 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/ReactivePulsarTemplateTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.reactive.client.api.MessageSpec; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterEach; @@ -47,9 +48,12 @@ import org.springframework.pulsar.core.DefaultSchemaResolver; import org.springframework.pulsar.core.DefaultTopicResolver; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; +import org.springframework.pulsar.test.support.model.UserRecord; import org.springframework.util.function.ThrowingConsumer; +import com.fasterxml.jackson.databind.ObjectMapper; import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; /** * Tests for {@link ReactivePulsarTemplate}. @@ -157,8 +161,8 @@ void sendMessageWithMessageCustomizer() throws Exception { .withMessageCustomizer((mb) -> mb.key("test-key")) .send() .subscribe(); - Message msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING, - "test-message", true); + Message msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING, "test-message", + true); assertThat(msg.getKey()).isEqualTo("test-key"); } @@ -168,15 +172,15 @@ void sendMessageWithSenderCustomizer() throws Exception { .withSenderCustomizer((sb) -> sb.producerName("test-producer")) .send() .subscribe(); - Message msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING, - "test-message", true); + Message msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING, "test-message", + true); assertThat(msg.getProducerName()).isEqualTo("test-producer"); } @ParameterizedTest @ValueSource(booleans = { true, false }) void sendMessageWithTopicInferredByTypeMappings(boolean producerFactoryHasDefaultTopic) throws Exception { - String topic = "ptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic"; + String topic = "rptt-topicInferred-" + producerFactoryHasDefaultTopic + "-topic"; ReactivePulsarSenderFactory producerFactory = DefaultReactivePulsarSenderFactory.builderFor(client) .withDefaultTopic(producerFactoryHasDefaultTopic ? "fake-topic" : null) .build(); @@ -200,8 +204,8 @@ void sendMessageWithoutTopicFails() { .withMessage("Topic must be specified when no default topic is configured"); } - private Message sendAndConsume(Consumer> sendFunction, String topic, - Schema schema, @Nullable T expectedValue, Boolean withDefaultTopic) throws Exception { + private Message sendAndConsume(Consumer> sendFunction, String topic, + Schema schema, @Nullable V expectedValue, Boolean withDefaultTopic) throws Exception { ReactivePulsarSenderFactory senderFactory = DefaultReactivePulsarSenderFactory.builderFor(client) .withDefaultTopic(withDefaultTopic ? topic : null) .build(); @@ -209,16 +213,16 @@ private Message sendAndConsume(Consumer> sendFu return sendAndConsume(pulsarTemplate, sendFunction, topic, schema, expectedValue); } - private Message sendAndConsume(ReactivePulsarTemplate template, - Consumer> sendFunction, String topic, Schema schema, @Nullable T expectedValue) + private Message sendAndConsume(ReactivePulsarTemplate template, + Consumer> sendFunction, String topic, Schema schema, @Nullable V expectedValue) throws Exception { - try (org.apache.pulsar.client.api.Consumer consumer = client.newConsumer(schema) + try (org.apache.pulsar.client.api.Consumer consumer = client.newConsumer(schema) .topic(topic) .subscriptionName(topic + "-sub") .subscribe()) { sendFunction.accept(template); - - Message msg = consumer.receive(3, TimeUnit.SECONDS); + Message msg = consumer.receive(3, TimeUnit.SECONDS); + consumer.acknowledge(msg); assertThat(msg).isNotNull(); assertThat(msg.getValue()).isEqualTo(expectedValue); return msg; @@ -230,7 +234,7 @@ class SendNonPrimitiveSchemaTests { @Test void withSpecifiedSchema() throws Exception { - String topic = "ptt-specificSchema-topic"; + String topic = "rptt-specificSchema-topic"; Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID()); ThrowingConsumer> sendFunction = ( template) -> template.send(foo, Schema.AVRO(Foo.class)).subscribe(); @@ -239,7 +243,7 @@ void withSpecifiedSchema() throws Exception { @Test void withSchemaInferredByMessageType() throws Exception { - String topic = "ptt-nospecificSchema-topic"; + String topic = "rptt-nospecificSchema-topic"; Foo foo = new Foo("Foo-" + UUID.randomUUID(), "Bar-" + UUID.randomUUID()); ThrowingConsumer> sendFunction = (template) -> template.send(foo).subscribe(); sendAndConsume(sendFunction, topic, Schema.JSON(Foo.class), foo, true); @@ -247,7 +251,7 @@ void withSchemaInferredByMessageType() throws Exception { @Test void withSchemaInferredByTypeMappings() throws Exception { - String topic = "ptt-schemaInferred-topic"; + String topic = "rptt-schemaInferred-topic"; ReactivePulsarSenderFactory producerFactory = DefaultReactivePulsarSenderFactory .builderFor(client) .withDefaultTopic(topic) @@ -293,6 +297,41 @@ void sendNullWithoutSchemaFails() { } + @Nested + class SendAutoProduceSchemaTests { + + @Test + void withJsonSchema() throws Exception { + var topic = "rptt-auto-json-topic"; + + // First send to the topic as JSON to establish the schema for the topic + var userJsonSchema = Schema.JSON(UserRecord.class); + var user = new UserRecord("Jason", 5150); + ThrowingConsumer> sendAsUserFunction = ( + template) -> template.send(user, userJsonSchema).subscribe(); + sendAndConsume(sendAsUserFunction, topic, userJsonSchema, user, true); + + // Next send another user using byte[] with AUTO_PRODUCE - it should be + // consumed fine + var user2 = new UserRecord("Who", 6160); + var user2Bytes = new ObjectMapper().writeValueAsBytes(user2); + ThrowingConsumer> sendAsBytesFunction = ( + template) -> template.send(user2Bytes, Schema.AUTO_PRODUCE_BYTES()).subscribe(); + sendAndConsume(sendAsBytesFunction, topic, userJsonSchema, user2, true); + + // Finally send another user using byte[] with AUTO_PRODUCE w/ invalid payload + // - it should be rejected + var bytesSenderFactory = DefaultReactivePulsarSenderFactory.builderFor(client) + .withDefaultTopic(topic) + .build(); + var bytesTemplate = new ReactivePulsarTemplate<>(bytesSenderFactory); + + StepVerifier.create(bytesTemplate.send("invalid-payload".getBytes(), Schema.AUTO_PRODUCE_BYTES())) + .expectError(SchemaSerializationException.class); + } + + } + public static class Foo { private String foo; diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java index 49665586d..261b72669 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/CachingPulsarProducerFactory.java @@ -34,7 +34,9 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; import org.apache.pulsar.common.protocol.schema.SchemaHash; +import org.apache.pulsar.common.schema.SchemaType; import org.springframework.beans.factory.DisposableBean; import org.springframework.core.log.LogAccessor; @@ -168,6 +170,8 @@ private void closeProducer(Producer producer, boolean async) { */ static class ProducerCacheKey { + private static final SchemaHash AUTO_PRODUCE_SCHEMA_HASH = SchemaHash.of(new byte[0], SchemaType.AUTO_PUBLISH); + private final Schema schema; private final SchemaHash schemaHash; @@ -193,7 +197,8 @@ static class ProducerCacheKey { Assert.notNull(schema, () -> "'schema' must be non-null"); Assert.notNull(topic, () -> "'topic' must be non-null"); this.schema = schema; - this.schemaHash = SchemaHash.of(this.schema); + this.schemaHash = (schema instanceof AutoProduceBytesSchema) ? AUTO_PRODUCE_SCHEMA_HASH + : SchemaHash.of(this.schema); this.topic = topic; this.encryptionKeys = encryptionKeys; this.customizers = customizers; diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java index 8d0f5e210..a5b06b20b 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/CachingPulsarProducerFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ import org.springframework.pulsar.cache.provider.CacheProvider; import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerCacheKey; import org.springframework.pulsar.core.CachingPulsarProducerFactory.ProducerWithCloseCallback; +import org.springframework.pulsar.test.support.model.UserPojo; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ObjectUtils; @@ -74,7 +75,7 @@ void cleanupFromTests() { } @Test - void createProducerMultipleCalls() throws PulsarClientException { + void createProducerMultipleCalls() { var producerFactory = newProducerFactory(); var cacheKey = new ProducerCacheKey<>(schema, "topic1", null, null); var producer1 = producerFactory.createProducer(schema, "topic1"); @@ -103,7 +104,7 @@ void cachedProducerIsCloseSafeWrapper() throws PulsarClientException { @SuppressWarnings("resource") @Test - void createProducerWithMatrixOfCacheKeys() throws PulsarClientException { + void createProducerWithMatrixOfCacheKeys() { String topic1 = "topic1"; String topic2 = "topic2"; var schema1 = new StringSchema(); @@ -167,7 +168,7 @@ void createProducerWithMatrixOfCacheKeys() throws PulsarClientException { } @Test - void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientException { + void factoryDestroyCleansUpCacheAndClosesProducers() { CachingPulsarProducerFactory producerFactory = producerFactory(pulsarClient, null, null); var actualProducer1 = actualProducer(producerFactory.createProducer(schema, "topic1")); var actualProducer2 = actualProducer(producerFactory.createProducer(schema, "topic2")); @@ -183,7 +184,7 @@ void factoryDestroyCleansUpCacheAndClosesProducers() throws PulsarClientExceptio } @Test - void producerEvictedFromCache() throws PulsarClientException { + void producerEvictedFromCache() { CachingPulsarProducerFactory producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, null, null, new DefaultTopicResolver(), Duration.ofSeconds(3L), 10L, 2); var actualProducer = actualProducer(producerFactory.createProducer(schema, "topic1")); @@ -306,7 +307,21 @@ static Stream equalsAndHashCodeTestProvider() { arguments( Named.of("differentNullInterceptor", new ProducerCacheKey<>(Schema.STRING, "topic1", encryptionKeys1, customizers1)), - new ProducerCacheKey<>(Schema.STRING, "topic1", null, null), false)); + new ProducerCacheKey<>(Schema.STRING, "topic1", null, null), false), + arguments( + Named.of("sameAutoProduceSchemaSameTopic", + new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)), + new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null), true), + arguments( + Named.of("autoProduceSchemaOneWithAndOneWithoutSchemaInfo", + new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)), + new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(UserPojo.class)), "topic1", + null, null), + true), + arguments( + Named.of("sameAutoProduceSchemaDifferentTopic", + new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic1", null, null)), + new ProducerCacheKey<>(Schema.AUTO_PRODUCE_BYTES(), "topic2", null, null), false)); } } @@ -315,7 +330,7 @@ static Stream equalsAndHashCodeTestProvider() { class RestartFactoryTests { @Test - void restartLifecycle() throws PulsarClientException { + void restartLifecycle() { var producerFactory = (CachingPulsarProducerFactory) producerFactory(pulsarClient, null, null); producerFactory.start(); var producer1 = producerFactory.createProducer(schema, "topic1"); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java index e851d0bb9..f0e561ebe 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/PulsarTemplateTests.java @@ -17,6 +17,7 @@ package org.springframework.pulsar.core; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -40,6 +41,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterEach; @@ -53,8 +55,11 @@ import org.junit.jupiter.params.provider.ValueSource; import org.springframework.pulsar.test.support.PulsarTestContainerSupport; +import org.springframework.pulsar.test.support.model.UserRecord; import org.springframework.util.function.ThrowingConsumer; +import com.fasterxml.jackson.databind.ObjectMapper; + /** * Tests for {@link PulsarTemplate}. * @@ -170,8 +175,8 @@ void sendMessageWithMessageCustomizer() throws Exception { ThrowingConsumer> sendFunction = (template) -> template.newMessage("test-message") .withMessageCustomizer((mb) -> mb.key("test-key")) .send(); - Message msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING, - "test-message", true); + Message msg = sendAndConsume(sendFunction, "sendMessageWithMessageCustomizer", Schema.STRING, "test-message", + true); assertThat(msg.getKey()).isEqualTo("test-key"); } @@ -180,8 +185,8 @@ void sendMessageWithSenderCustomizer() throws Exception { ThrowingConsumer> sendFunction = (template) -> template.newMessage("test-message") .withProducerCustomizer((sb) -> sb.producerName("test-producer")) .send(); - Message msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING, - "test-message", true); + Message msg = sendAndConsume(sendFunction, "sendMessageWithSenderCustomizer", Schema.STRING, "test-message", + true); assertThat(msg.getProducerName()).isEqualTo("test-producer"); } @@ -241,22 +246,24 @@ void sendMessageWithoutTopicFails() { .withMessage("Topic must be specified when no default topic is configured"); } - private Message sendAndConsume(ThrowingConsumer> sendFunction, String topic, - Schema schema, T expectedValue, Boolean withDefaultTopic) throws Exception { + private Message sendAndConsume(ThrowingConsumer> sendFunction, String topic, + Schema consumingSchema, V expectedValue, Boolean withDefaultTopic) throws Exception { PulsarProducerFactory senderFactory = new DefaultPulsarProducerFactory<>(client, withDefaultTopic ? topic : null); PulsarTemplate pulsarTemplate = new PulsarTemplate<>(senderFactory); - return sendAndConsume(pulsarTemplate, sendFunction, topic, schema, expectedValue); + return sendAndConsume(pulsarTemplate, sendFunction, topic, consumingSchema, expectedValue); } - private Message sendAndConsume(PulsarTemplate template, ThrowingConsumer> sendFunction, - String topic, Schema schema, T expectedValue) throws Exception { - try (org.apache.pulsar.client.api.Consumer consumer = client.newConsumer(schema) + private Message sendAndConsume(PulsarTemplate template, + ThrowingConsumer> sendFunction, String topic, Schema schema, V expectedValue) + throws Exception { + try (org.apache.pulsar.client.api.Consumer consumer = client.newConsumer(schema) .topic(topic) .subscriptionName(topic + "-sub") .subscribe()) { sendFunction.accept(template); - Message msg = consumer.receive(3, TimeUnit.SECONDS); + Message msg = consumer.receive(3, TimeUnit.SECONDS); + consumer.acknowledge(msg); assertThat(msg).isNotNull(); assertThat(msg.getValue()).isEqualTo(expectedValue); return msg; @@ -322,6 +329,38 @@ void sendNullWithoutSchemaFails() { } + @Nested + class SendAutoProduceSchemaTests { + + @Test + void withJsonSchema() throws Exception { + var topic = "ptt-auto-json-topic"; + + // First send to the topic as JSON to establish the schema for the topic + var userJsonSchema = Schema.JSON(UserRecord.class); + var user = new UserRecord("Jason", 5150); + ThrowingConsumer> sendAsUserFunction = (template) -> template.send(user, + userJsonSchema); + sendAndConsume(sendAsUserFunction, topic, userJsonSchema, user, true); + + // Next send another user using byte[] with AUTO_PRODUCE - it should be + // consumed fine + var user2 = new UserRecord("Who", 6160); + var user2Bytes = new ObjectMapper().writeValueAsBytes(user2); + ThrowingConsumer> sendAsBytesFunction = (template) -> template.send(user2Bytes, + Schema.AUTO_PRODUCE_BYTES()); + sendAndConsume(sendAsBytesFunction, topic, userJsonSchema, user2, true); + + // Finally send another user using byte[] with AUTO_PRODUCE w/ invalid payload + // - it should be rejected + var bytesProducerFactory = new DefaultPulsarProducerFactory(client, topic); + var bytesTemplate = new PulsarTemplate<>(bytesProducerFactory); + assertThatExceptionOfType(SchemaSerializationException.class) + .isThrownBy(() -> bytesTemplate.send("invalid-payload".getBytes(), Schema.AUTO_PRODUCE_BYTES())); + } + + } + public static class Foo { private String foo;