|
35 | 35 | import com.fasterxml.jackson.databind.SequenceWriter;
|
36 | 36 | import com.fasterxml.jackson.databind.exc.InvalidDefinitionException;
|
37 | 37 | import com.fasterxml.jackson.databind.ser.FilterProvider;
|
| 38 | +import org.apache.commons.logging.Log; |
38 | 39 | import org.reactivestreams.Publisher;
|
39 | 40 | import reactor.core.publisher.Flux;
|
40 | 41 | import reactor.core.publisher.Mono;
|
@@ -70,6 +71,8 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple
|
70 | 71 |
|
71 | 72 | private static final byte[] NEWLINE_SEPARATOR = {'\n'};
|
72 | 73 |
|
| 74 | + private static final byte[] EMPTY_BYTES = new byte[0]; |
| 75 | + |
73 | 76 | private static final Map<String, JsonEncoding> ENCODINGS;
|
74 | 77 |
|
75 | 78 | static {
|
@@ -150,45 +153,47 @@ public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory buffe
|
150 | 153 | .map(value -> encodeValue(value, bufferFactory, elementType, mimeType, hints))
|
151 | 154 | .flux();
|
152 | 155 | }
|
153 |
| - else { |
| 156 | + |
| 157 | + try { |
| 158 | + ObjectMapper mapper = selectObjectMapper(elementType, mimeType); |
| 159 | + if (mapper == null) { |
| 160 | + throw new IllegalStateException("No ObjectMapper for " + elementType); |
| 161 | + } |
| 162 | + ObjectWriter writer = createObjectWriter(mapper, elementType, mimeType, null, hints); |
| 163 | + ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler()); |
| 164 | + JsonEncoding encoding = getJsonEncoding(mimeType); |
| 165 | + JsonGenerator generator = mapper.getFactory().createGenerator(byteBuilder, encoding); |
| 166 | + SequenceWriter sequenceWriter = writer.writeValues(generator); |
| 167 | + |
154 | 168 | byte[] separator = getStreamingMediaTypeSeparator(mimeType);
|
155 |
| - if (separator != null) { // streaming |
156 |
| - try { |
157 |
| - ObjectMapper mapper = selectObjectMapper(elementType, mimeType); |
158 |
| - if (mapper == null) { |
159 |
| - throw new IllegalStateException("No ObjectMapper for " + elementType); |
160 |
| - } |
161 |
| - ObjectWriter writer = createObjectWriter(mapper, elementType, mimeType, null, hints); |
162 |
| - ByteArrayBuilder byteBuilder = new ByteArrayBuilder(writer.getFactory()._getBufferRecycler()); |
163 |
| - JsonEncoding encoding = getJsonEncoding(mimeType); |
164 |
| - JsonGenerator generator = mapper.getFactory().createGenerator(byteBuilder, encoding); |
165 |
| - SequenceWriter sequenceWriter = writer.writeValues(generator); |
166 |
| - |
167 |
| - return Flux.from(inputStream) |
168 |
| - .map(value -> encodeStreamingValue(value, bufferFactory, hints, sequenceWriter, byteBuilder, |
169 |
| - separator)) |
170 |
| - .doAfterTerminate(() -> { |
171 |
| - try { |
172 |
| - byteBuilder.release(); |
173 |
| - generator.close(); |
174 |
| - } |
175 |
| - catch (IOException ex) { |
176 |
| - logger.error("Could not close Encoder resources", ex); |
177 |
| - } |
178 |
| - }); |
179 |
| - } |
180 |
| - catch (IOException ex) { |
181 |
| - return Flux.error(ex); |
182 |
| - } |
| 169 | + Flux<DataBuffer> dataBufferFlux; |
| 170 | + |
| 171 | + if (separator != null) { |
| 172 | + dataBufferFlux = Flux.from(inputStream).map(value -> encodeStreamingValue( |
| 173 | + value, bufferFactory, hints, sequenceWriter, byteBuilder, EMPTY_BYTES, separator)); |
183 | 174 | }
|
184 |
| - else { // non-streaming |
185 |
| - ResolvableType listType = ResolvableType.forClassWithGenerics(List.class, elementType); |
186 |
| - return Flux.from(inputStream) |
187 |
| - .collectList() |
188 |
| - .map(list -> encodeValue(list, bufferFactory, listType, mimeType, hints)) |
189 |
| - .flux(); |
| 175 | + else { |
| 176 | + JsonArrayJoinHelper helper = new JsonArrayJoinHelper(); |
| 177 | + return Flux.concat( |
| 178 | + helper.getPrefix(bufferFactory, hints, logger), |
| 179 | + Flux.from(inputStream).map(value -> encodeStreamingValue( |
| 180 | + value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES)), |
| 181 | + helper.getSuffix(bufferFactory, hints, logger)); |
190 | 182 | }
|
191 | 183 |
|
| 184 | + return dataBufferFlux |
| 185 | + .doAfterTerminate(() -> { |
| 186 | + try { |
| 187 | + byteBuilder.release(); |
| 188 | + generator.close(); |
| 189 | + } |
| 190 | + catch (IOException ex) { |
| 191 | + logger.error("Could not close Encoder resources", ex); |
| 192 | + } |
| 193 | + }); |
| 194 | + } |
| 195 | + catch (IOException ex) { |
| 196 | + return Flux.error(ex); |
192 | 197 | }
|
193 | 198 | }
|
194 | 199 |
|
@@ -247,8 +252,10 @@ public DataBuffer encodeValue(Object value, DataBufferFactory bufferFactory,
|
247 | 252 | }
|
248 | 253 | }
|
249 | 254 |
|
250 |
| - private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints, |
251 |
| - SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, byte[] separator) { |
| 255 | + private DataBuffer encodeStreamingValue( |
| 256 | + Object value, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints, |
| 257 | + SequenceWriter sequenceWriter, ByteArrayBuilder byteArrayBuilder, |
| 258 | + byte[] prefix, byte[] suffix) { |
252 | 259 |
|
253 | 260 | logValue(hints, value);
|
254 | 261 |
|
@@ -280,9 +287,14 @@ private DataBuffer encodeStreamingValue(Object value, DataBufferFactory bufferFa
|
280 | 287 | offset = 0;
|
281 | 288 | length = bytes.length;
|
282 | 289 | }
|
283 |
| - DataBuffer buffer = bufferFactory.allocateBuffer(length + separator.length); |
| 290 | + DataBuffer buffer = bufferFactory.allocateBuffer(length + prefix.length + suffix.length); |
| 291 | + if (prefix.length != 0) { |
| 292 | + buffer.write(prefix); |
| 293 | + } |
284 | 294 | buffer.write(bytes, offset, length);
|
285 |
| - buffer.write(separator); |
| 295 | + if (suffix.length != 0) { |
| 296 | + buffer.write(suffix); |
| 297 | + } |
286 | 298 | Hints.touchDataBuffer(buffer, hints, logger);
|
287 | 299 |
|
288 | 300 | return buffer;
|
@@ -385,4 +397,43 @@ protected <A extends Annotation> A getAnnotation(MethodParameter parameter, Clas
|
385 | 397 | return parameter.getMethodAnnotation(annotType);
|
386 | 398 | }
|
387 | 399 |
|
| 400 | + |
| 401 | + private static class JsonArrayJoinHelper { |
| 402 | + |
| 403 | + private static final byte[] COMMA_SEPARATOR = {','}; |
| 404 | + |
| 405 | + private static final byte[] OPEN_BRACKET = {'['}; |
| 406 | + |
| 407 | + private static final byte[] CLOSE_BRACKET = {']'}; |
| 408 | + |
| 409 | + |
| 410 | + private boolean afterFirstItem = false; |
| 411 | + |
| 412 | + public byte[] getDelimiter() { |
| 413 | + if (this.afterFirstItem) { |
| 414 | + return COMMA_SEPARATOR; |
| 415 | + } |
| 416 | + this.afterFirstItem = true; |
| 417 | + return EMPTY_BYTES; |
| 418 | + } |
| 419 | + |
| 420 | + public Mono<DataBuffer> getPrefix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) { |
| 421 | + return wrapBytes(OPEN_BRACKET, factory, hints, logger); |
| 422 | + } |
| 423 | + |
| 424 | + public Mono<DataBuffer> getSuffix(DataBufferFactory factory, @Nullable Map<String, Object> hints, Log logger) { |
| 425 | + return wrapBytes(CLOSE_BRACKET, factory, hints, logger); |
| 426 | + } |
| 427 | + |
| 428 | + private Mono<DataBuffer> wrapBytes( |
| 429 | + byte[] bytes, DataBufferFactory bufferFactory, @Nullable Map<String, Object> hints, Log logger) { |
| 430 | + |
| 431 | + return Mono.fromCallable(() -> { |
| 432 | + DataBuffer buffer = bufferFactory.wrap(bytes); |
| 433 | + Hints.touchDataBuffer(buffer, hints, logger); |
| 434 | + return buffer; |
| 435 | + }); |
| 436 | + } |
| 437 | + } |
| 438 | + |
388 | 439 | }
|
0 commit comments