Skip to content

NullPointerException in AbstractJavaTypeMapper #652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mohamnag opened this issue Apr 18, 2018 · 3 comments
Closed

NullPointerException in AbstractJavaTypeMapper #652

mohamnag opened this issue Apr 18, 2018 · 3 comments
Assignees
Milestone

Comments

@mohamnag
Copy link

I'm using the spring-kafka 2.1.5 with kafka-streams 1.0.1 and I'm facing a null pointer with following stack:

java.lang.NullPointerException: null
	at org.springframework.kafka.support.converter.AbstractJavaTypeMapper.addHeader(AbstractJavaTypeMapper.java:146)
	at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.fromJavaType(DefaultJackson2JavaTypeMapper.java:177)
	at org.springframework.kafka.support.serializer.JsonSerializer.serialize(JsonSerializer.java:134)
	at org.apache.kafka.streams.kstream.internals.ChangedSerializer.serialize(ChangedSerializer.java:66)
	at org.apache.kafka.streams.kstream.internals.ChangedSerializer.serialize(ChangedSerializer.java:83)
	at org.apache.kafka.streams.kstream.internals.ChangedSerializer.serialize(ChangedSerializer.java:29)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:91)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
	at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:94)
	at org.apache.kafka.streams.kstream.internals.KTableRepartitionMap$KTableMapProcessor.process(KTableRepartitionMap.java:72)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
	at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:106)
	at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83)
	at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
	at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:40)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:35)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:79)
	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
	at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112)
	at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:153)
	at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
	at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
	at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327)
	at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:307)
	at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
	at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302)
	at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292)
	at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
	at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)
	at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381)
	at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310)
	at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

my streams setup is to re-key incomming messages and group them by new key something like:

streamsBuilder.
                        ... // source the topic with type OriginalType
                        .groupBy(
                                (key, value) -> KeyValue.pair(
                                        value.getNewKeyPart1() + value.getNewKeyPart2(),
                                        value
                                ),
                                Serialized.with(
                                        Serdes.String(),
                                        new JsonSerde<>(SomeNewType.class, objectMapper)
                                )
                        )

after examining the stack, it is clear that Kafka Streams passes null as value for headers (ChangedSerializer.java:83) and you don't account for it being null on AbstractJavaTypeMapper.java:146.

@garyrussell garyrussell self-assigned this Apr 18, 2018
@garyrussell garyrussell added this to the 2.1.6 milestone Apr 18, 2018
@garyrussell
Copy link
Contributor

We can (and will) fix the NPE but since Kafka Streams doesn't support headers, we won't be able to convey the type information in the record so the consumer Serde will need to be configured with the targetType for deserialization.

@mohamnag
Copy link
Author

Thanks for fixing the NPE.

Regarding the target type you're right and that should be done. However in current situation that won't fix the problem (I'm configuring the Serde for consumer just like the one on producer/group operation).
my current workaround is to set addType flag to false by creating the Serde like this:

private <T> Serde<T> createJsonSerde(Class<T> targetType, ObjectMapper objectMapper) {
        JsonSerializer<T> jsonSerializer = new JsonSerializer<>(objectMapper);
        jsonSerializer.setAddTypeInfo(false);

        JsonDeserializer<T> jsonDeserializer = new JsonDeserializer<>(targetType, objectMapper);

        return
                new JsonSerde<>(
                        jsonSerializer,
                        jsonDeserializer
                );
    }

@garyrussell
Copy link
Contributor

Good; I'm glad you have a work-around.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Apr 18, 2018
Fixes: spring-projects#652

Kafka Streams `ChangedSerializer` calls the `Serde` with `null` in `headers`.

**cherry-pick to 2.1.x, 2.0.x, 1.3.x**
garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Apr 18, 2018
Fixes: spring-projects#652

Kafka Streams `ChangedSerializer` calls the `Serde` with `null` in `headers`.

**cherry-pick to 2.1.x, 2.0.x, 1.3.x**
artembilan pushed a commit that referenced this issue Apr 18, 2018
Fixes: #652

Kafka Streams `ChangedSerializer` calls the `Serde` with `null` in `headers`.

**cherry-pick to 2.1.x, 2.0.x, 1.3.x**
artembilan pushed a commit that referenced this issue Apr 18, 2018
Fixes: #652

Kafka Streams `ChangedSerializer` calls the `Serde` with `null` in `headers`.

**cherry-pick to 2.1.x, 2.0.x, 1.3.x**

(cherry picked from commit ad28b91)
denis554 added a commit to denis554/spring-kafka that referenced this issue Mar 27, 2019
Fixes: spring-projects/spring-kafka#652

Kafka Streams `ChangedSerializer` calls the `Serde` with `null` in `headers`.

**cherry-pick to 2.1.x, 2.0.x, 1.3.x**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants