Skip to content

Add support for JSON streams to Kotlin Serialization #32074

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package org.springframework.http.codec;

import java.util.List;
import java.util.Map;
import java.util.*;

import kotlinx.serialization.KSerializer;
import kotlinx.serialization.StringFormat;
Expand Down Expand Up @@ -95,13 +94,20 @@ public List<MimeType> getDecodableMimeTypes(ResolvableType targetType) {
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType,
@Nullable Map<String, Object> hints) {
return Flux.error(new UnsupportedOperationException());
return Flux.defer(() -> {
KSerializer<Object> serializer = serializer(elementType);
if (serializer == null) {
return Mono.error(new DecodingException("Could not find KSerializer for " + elementType));
}
return this.stringDecoder
.decode(inputStream, elementType, mimeType, hints)
.map(string -> format().decodeFromString(serializer, string));
});
}

@Override
public Mono<Object> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {

return Mono.defer(() -> {
KSerializer<Object> serializer = serializer(elementType);
if (serializer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package org.springframework.http.codec;

import java.util.List;
import java.util.Map;
import java.util.*;

import kotlin.text.Charsets;
import kotlinx.serialization.KSerializer;
import kotlinx.serialization.StringFormat;
import org.reactivestreams.Publisher;
import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -49,11 +50,17 @@ public abstract class KotlinSerializationStringEncoder<T extends StringFormat> e

// CharSequence encoding needed for now, see https://github.com/Kotlin/kotlinx.serialization/issues/204 for more details
private final CharSequenceEncoder charSequenceEncoder = CharSequenceEncoder.allMimeTypes();
private final Set<MimeType> streamingMediaTypes = new HashSet<>();

protected KotlinSerializationStringEncoder(T format, MimeType... supportedMimeTypes) {
super(format, supportedMimeTypes);
}

public void setStreamingMediaTypes(Collection<MediaType> streamingMediaTypes) {
this.streamingMediaTypes.clear();
this.streamingMediaTypes.addAll(streamingMediaTypes);
}

@Override
public boolean canEncode(ResolvableType elementType, @Nullable MimeType mimeType) {
return canSerialize(elementType, mimeType);
Expand All @@ -79,13 +86,17 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
.map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
.flux();
}
else {
ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);

if (mimeType != null && streamingMediaTypes.contains(mimeType)) {
return Flux.from(inputStream)
.collectList()
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints))
.flux();
.map(list -> encodeValue(list, bufferFactory, elementType, mimeType, hints).write("\n", Charsets.UTF_8));
}

ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType);
return Flux.from(inputStream)
.collectList()
.map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints))
.flux();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public KotlinSerializationJsonDecoder() {
}

public KotlinSerializationJsonDecoder(Json json) {
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"));
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"),
MediaType.APPLICATION_NDJSON);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.springframework.http.MediaType;
import org.springframework.http.codec.KotlinSerializationStringEncoder;

import java.util.List;

/**
* Encode from an {@code Object} stream to a byte stream of JSON objects using
* <a href="https://github.com/Kotlin/kotlinx.serialization">kotlinx.serialization</a>.
Expand All @@ -42,7 +44,9 @@ public KotlinSerializationJsonEncoder() {
}

public KotlinSerializationJsonEncoder(Json json) {
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"));
super(json, MediaType.APPLICATION_JSON, new MediaType("application", "*+json"),
MediaType.APPLICATION_NDJSON);
setStreamingMediaTypes(List.of(MediaType.APPLICATION_NDJSON));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.springframework.core.io.buffer.DataBuffer
import org.springframework.core.testfixture.codec.AbstractDecoderTests
import org.springframework.http.MediaType
import org.springframework.http.customJson
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier
import java.math.BigDecimal
Expand All @@ -45,12 +46,16 @@ class CustomKotlinSerializationJsonDecoderTests :

@Test
override fun decode() {
val output = decoder.decode(Mono.empty(),
ResolvableType.forClass(KotlinSerializationJsonDecoderTests.Pojo::class.java), null, emptyMap())
val input = Flux.concat(
stringBuffer("1.0\n"),
stringBuffer("2.0\n")
)
val output = decoder.decode(input, ResolvableType.forClass(BigDecimal::class.java), null, emptyMap())
StepVerifier
.create(output)
.expectError(UnsupportedOperationException::class.java)
.verify()
.expectNext(BigDecimal.valueOf(1.0))
.expectNext(BigDecimal.valueOf(2.0))
.verifyComplete()
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class KotlinSerializationJsonDecoderTests : AbstractDecoderTests<KotlinSerializa
assertThat(decoder.canDecode(ResolvableType.forClass(Ordered::class.java), MediaType.APPLICATION_JSON)).isFalse()
assertThat(decoder.canDecode(ResolvableType.NONE, MediaType.APPLICATION_JSON)).isFalse()
assertThat(decoder.canDecode(ResolvableType.forClass(BigDecimal::class.java), MediaType.APPLICATION_JSON)).isFalse()

assertThat(decoder.canDecode(ResolvableType.forClass(Pojo::class.java), MediaType.APPLICATION_NDJSON)).isTrue()
}

@Test
Expand All @@ -73,6 +75,23 @@ class KotlinSerializationJsonDecoderTests : AbstractDecoderTests<KotlinSerializa
.expectError(UnsupportedOperationException::class.java)
}


@Test
fun decodeStream() {
val input = Flux.concat(
stringBuffer("{\"bar\":\"b1\",\"foo\":\"f1\"}\n"),
stringBuffer("{\"bar\":\"b2\",\"foo\":\"f2\"}\n")
)

testDecodeAll(input, ResolvableType.forClass(Pojo::class.java), { step: FirstStep<Any> ->
step
.expectNext(Pojo("f1", "b1"))
.expectNext(Pojo("f2", "b2"))
.expectComplete()
.verify()
}, null, null)
}

@Test
override fun decodeToMono() {
val input = Flux.concat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests<KotlinSerializa
assertThat(encoder.canEncode(ResolvableType.forClassWithGenerics(ArrayList::class.java, Int::class.java), MediaType.APPLICATION_JSON)).isTrue()
assertThat(encoder.canEncode(ResolvableType.forClassWithGenerics(ArrayList::class.java, Int::class.java), MediaType.APPLICATION_PDF)).isFalse()
assertThat(encoder.canEncode(ResolvableType.NONE, MediaType.APPLICATION_JSON)).isFalse()

assertThat(encoder.canEncode(pojoType, MediaType.APPLICATION_NDJSON)).isTrue()
}

@Test
override fun encode() {
val input = Flux.just(
Pojo("foo", "bar"),
Pojo("foofoo", "barbar"),
Pojo("foofoofoo", "barbarbar")
Pojo("foo", "bar"),
Pojo("foofoo", "barbar"),
Pojo("foofoofoo", "barbarbar")
)
testEncode(input, Pojo::class.java, { step: FirstStep<DataBuffer?> -> step
.consumeNextWith(expectString("[" +
Expand All @@ -76,6 +78,26 @@ class KotlinSerializationJsonEncoderTests : AbstractEncoderTests<KotlinSerializa
.verifyComplete()
})
}
@Test
fun encodeStream() {
val input = Flux.just(
Pojo("foo", "bar"),
Pojo("foofoo", "barbar"),
Pojo("foofoofoo", "barbarbar")
)
testEncodeAll(
input,
ResolvableType.forClass(Pojo::class.java),
MediaType.APPLICATION_NDJSON,
null
) { step: FirstStep<DataBuffer?> ->
step
.consumeNextWith(expectString("{\"foo\":\"foo\",\"bar\":\"bar\"}\n"))
.consumeNextWith(expectString("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n"))
.consumeNextWith(expectString("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n"))
.verifyComplete()
}
}

@Test
fun encodeMono() {
Expand Down