1
1
/*
2
- * Copyright 2002-2018 the original author or authors.
2
+ * Copyright 2002-2019 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
26
26
import com .fasterxml .jackson .core .JsonProcessingException ;
27
27
import com .fasterxml .jackson .core .JsonToken ;
28
28
import com .fasterxml .jackson .core .async .ByteArrayFeeder ;
29
+ import com .fasterxml .jackson .databind .DeserializationContext ;
29
30
import com .fasterxml .jackson .databind .util .TokenBuffer ;
30
31
import reactor .core .publisher .Flux ;
31
32
32
33
import org .springframework .core .codec .DecodingException ;
33
34
import org .springframework .core .io .buffer .DataBuffer ;
34
35
import org .springframework .core .io .buffer .DataBufferUtils ;
35
- import org .springframework .util .Assert ;
36
36
37
37
/**
38
38
* {@link Function} to transform a JSON stream of arbitrary size, byte array
39
39
* chunks into a {@code Flux<TokenBuffer>} where each token buffer is a
40
40
* well-formed JSON object.
41
41
*
42
42
* @author Arjen Poutsma
43
+ * @author Rossen Stoyanchev
44
+ * @author Juergen Hoeller
43
45
* @since 5.0
44
46
*/
45
47
final class Jackson2Tokenizer {
@@ -59,36 +61,15 @@ final class Jackson2Tokenizer {
59
61
private final ByteArrayFeeder inputFeeder ;
60
62
61
63
62
- private Jackson2Tokenizer (JsonParser parser , boolean tokenizeArrayElements ) {
63
- Assert . notNull ( parser , "'parser' must not be null" );
64
+ private Jackson2Tokenizer (
65
+ JsonParser parser , DeserializationContext deserializationContext , boolean tokenizeArrayElements ) {
64
66
65
67
this .parser = parser ;
66
68
this .tokenizeArrayElements = tokenizeArrayElements ;
67
- this .tokenBuffer = new TokenBuffer (parser );
69
+ this .tokenBuffer = new TokenBuffer (parser , deserializationContext );
68
70
this .inputFeeder = (ByteArrayFeeder ) this .parser .getNonBlockingInputFeeder ();
69
71
}
70
72
71
- /**
72
- * Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
73
- * @param dataBuffers the source data buffers
74
- * @param jsonFactory the factory to use
75
- * @param tokenizeArrayElements if {@code true} and the "top level" JSON
76
- * object is an array, each element is returned individually, immediately
77
- * after it is received.
78
- * @return the result token buffers
79
- */
80
- public static Flux <TokenBuffer > tokenize (Flux <DataBuffer > dataBuffers , JsonFactory jsonFactory ,
81
- boolean tokenizeArrayElements ) {
82
-
83
- try {
84
- JsonParser parser = jsonFactory .createNonBlockingByteArrayParser ();
85
- Jackson2Tokenizer tokenizer = new Jackson2Tokenizer (parser , tokenizeArrayElements );
86
- return dataBuffers .flatMap (tokenizer ::tokenize , Flux ::error , tokenizer ::endOfInput );
87
- }
88
- catch (IOException ex ) {
89
- return Flux .error (ex );
90
- }
91
- }
92
73
93
74
private Flux <TokenBuffer > tokenize (DataBuffer dataBuffer ) {
94
75
byte [] bytes = new byte [dataBuffer .readableByteCount ()];
@@ -100,8 +81,7 @@ private Flux<TokenBuffer> tokenize(DataBuffer dataBuffer) {
100
81
return parseTokenBufferFlux ();
101
82
}
102
83
catch (JsonProcessingException ex ) {
103
- return Flux .error (new DecodingException (
104
- "JSON decoding error: " + ex .getOriginalMessage (), ex ));
84
+ return Flux .error (new DecodingException ("JSON decoding error: " + ex .getOriginalMessage (), ex ));
105
85
}
106
86
catch (IOException ex ) {
107
87
return Flux .error (ex );
@@ -114,8 +94,7 @@ private Flux<TokenBuffer> endOfInput() {
114
94
return parseTokenBufferFlux ();
115
95
}
116
96
catch (JsonProcessingException ex ) {
117
- return Flux .error (new DecodingException (
118
- "JSON decoding error: " + ex .getOriginalMessage (), ex ));
97
+ return Flux .error (new DecodingException ("JSON decoding error: " + ex .getOriginalMessage (), ex ));
119
98
}
120
99
catch (IOException ex ) {
121
100
return Flux .error (ex );
@@ -128,12 +107,11 @@ private Flux<TokenBuffer> parseTokenBufferFlux() throws IOException {
128
107
while (true ) {
129
108
JsonToken token = this .parser .nextToken ();
130
109
// SPR-16151: Smile data format uses null to separate documents
131
- if (( token == JsonToken .NOT_AVAILABLE ) ||
110
+ if (token == JsonToken .NOT_AVAILABLE ||
132
111
(token == null && (token = this .parser .nextToken ()) == null )) {
133
112
break ;
134
113
}
135
114
updateDepth (token );
136
-
137
115
if (!this .tokenizeArrayElements ) {
138
116
processTokenNormal (token , result );
139
117
}
@@ -164,8 +142,7 @@ private void updateDepth(JsonToken token) {
164
142
private void processTokenNormal (JsonToken token , List <TokenBuffer > result ) throws IOException {
165
143
this .tokenBuffer .copyCurrentEvent (this .parser );
166
144
167
- if ((token .isStructEnd () || token .isScalarValue ()) &&
168
- this .objectDepth == 0 && this .arrayDepth == 0 ) {
145
+ if ((token .isStructEnd () || token .isScalarValue ()) && this .objectDepth == 0 && this .arrayDepth == 0 ) {
169
146
result .add (this .tokenBuffer );
170
147
this .tokenBuffer = new TokenBuffer (this .parser );
171
148
}
@@ -177,8 +154,7 @@ private void processTokenArray(JsonToken token, List<TokenBuffer> result) throws
177
154
this .tokenBuffer .copyCurrentEvent (this .parser );
178
155
}
179
156
180
- if (this .objectDepth == 0 &&
181
- (this .arrayDepth == 0 || this .arrayDepth == 1 ) &&
157
+ if (this .objectDepth == 0 && (this .arrayDepth == 0 || this .arrayDepth == 1 ) &&
182
158
(token == JsonToken .END_OBJECT || token .isScalarValue ())) {
183
159
result .add (this .tokenBuffer );
184
160
this .tokenBuffer = new TokenBuffer (this .parser );
@@ -190,4 +166,26 @@ private boolean isTopLevelArrayToken(JsonToken token) {
190
166
(token == JsonToken .END_ARRAY && this .arrayDepth == 0 ));
191
167
}
192
168
169
+
170
+ /**
171
+ * Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
172
+ * @param dataBuffers the source data buffers
173
+ * @param jsonFactory the factory to use
174
+ * @param tokenizeArrayElements if {@code true} and the "top level" JSON object is
175
+ * an array, each element is returned individually immediately after it is received
176
+ * @return the resulting token buffers
177
+ */
178
+ public static Flux <TokenBuffer > tokenize (Flux <DataBuffer > dataBuffers , JsonFactory jsonFactory ,
179
+ DeserializationContext deserializationContext , boolean tokenizeArrayElements ) {
180
+
181
+ try {
182
+ JsonParser parser = jsonFactory .createNonBlockingByteArrayParser ();
183
+ Jackson2Tokenizer tokenizer = new Jackson2Tokenizer (parser , deserializationContext , tokenizeArrayElements );
184
+ return dataBuffers .flatMap (tokenizer ::tokenize , Flux ::error , tokenizer ::endOfInput );
185
+ }
186
+ catch (IOException ex ) {
187
+ return Flux .error (ex );
188
+ }
189
+ }
190
+
193
191
}
0 commit comments